You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ratha v <vi...@gmail.com> on 2016/03/24 06:55:30 UTC
kafka.consumer.ConsumerTimeoutException
Hi all;
I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
to consume the messages,It continuously throwing timeoutexception..How can
i get rid of this?
I have multiple topics.
*Executor*
public class MessageListener {
private Properties properties;
private ConsumerConnector consumerConnector;
private String topic;
private ExecutorService executor;
public MessageListener(String topic) {
this.topic = topic;
KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
try {
properties = confLoader.loadConsumerConfig();
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public void start(RawFile file) {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
for (KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ListenerThread(stream));
}
}
}
*Thread*
public class ListenerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;;
public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
this.stream = msgStream;
}
@Override
public void run() {
try {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
String topic = messageAndMetadata.topic();
byte[] message = messageAndMetadata.message();
System.out.println("111111111111111111111111111");
FileProcessor processor = new FileProcessor();
processor.processFile(topic, message);
}
} catch (ConsumerTimeoutException cte) {
System.out.println("Consumer timed out");
}
Thanks.
--
-Ratha
http://vvratha.blogspot.com/
Re: kafka.consumer.ConsumerTimeoutException
Posted by Ratha v <vi...@gmail.com>.
Forgot to mention, I'm using kafka 2.11 version
On 24 March 2016 at 16:55, Ratha v <vi...@gmail.com> wrote:
> Hi all;
>
> I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
> to consume the messages,It continuously throwing timeoutexception..How can
> i get rid of this?
>
> I have multiple topics.
>
>
> *Executor*
>
>
> public class MessageListener {
>
> private Properties properties;
>
>
> private ConsumerConnector consumerConnector;
>
> private String topic;
>
> private ExecutorService executor;
>
>
> public MessageListener(String topic) {
>
> this.topic = topic;
>
>
> KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
>
> try {
>
> properties = confLoader.loadConsumerConfig();
>
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>
> consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
>
> } catch (FileNotFoundException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
>
> public void start(RawFile file) {
>
>
> Map<String, Integer> topicCountMap = new HashMap<>();
>
> topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
>
>
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector
>
> .createMessageStreams(topicCountMap);
>
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
> executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
>
>
> for (KafkaStream<byte[], byte[]> stream : streams) {
>
> executor.submit(new ListenerThread(stream));
>
>
> }
>
> }
>
>
>
> }
>
>
>
> *Thread*
>
> public class ListenerThread implements Runnable {
>
> private KafkaStream<byte[], byte[]> stream;;
>
>
> public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
>
> this.stream = msgStream;
>
>
> }
>
>
> @Override
>
> public void run() {
>
> try {
>
>
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
> while (it.hasNext()) {
>
> MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
>
> String topic = messageAndMetadata.topic();
>
> byte[] message = messageAndMetadata.message();
>
> System.out.println("111111111111111111111111111");
>
> FileProcessor processor = new FileProcessor();
>
> processor.processFile(topic, message);
>
> }
>
> } catch (ConsumerTimeoutException cte) {
>
> System.out.println("Consumer timed out");
>
> }
>
>
>
> Thanks.
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>
--
-Ratha
http://vvratha.blogspot.com/