You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ratha v <vi...@gmail.com> on 2016/03/24 06:55:30 UTC

kafka.consumer.ConsumerTimeoutException

Hi all;

I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
to consume the messages,It continuously throwing timeoutexception..How can
i get rid of this?

I have multiple topics.


*Executor*


public class MessageListener {

private Properties properties;


private ConsumerConnector consumerConnector;

private String topic;

private ExecutorService executor;


public MessageListener(String topic) {

this.topic = topic;


KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();

try {

properties = confLoader.loadConsumerConfig();

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

} catch (FileNotFoundException e) {

e.printStackTrace();

}

}


public void start(RawFile file) {


Map<String, Integer> topicCountMap = new HashMap<>();

topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));


Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector

.createMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);


for (KafkaStream<byte[], byte[]> stream : streams) {

executor.submit(new ListenerThread(stream));


}

}



}



*Thread*

public class ListenerThread implements Runnable {

private KafkaStream<byte[], byte[]> stream;;


public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {

this.stream = msgStream;


}


@Override

public void run() {

try {


ConsumerIterator<byte[], byte[]> it = stream.iterator();

while (it.hasNext()) {

MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();

String topic = messageAndMetadata.topic();

byte[] message = messageAndMetadata.message();

System.out.println("111111111111111111111111111");

FileProcessor processor = new FileProcessor();

processor.processFile(topic, message);

}

} catch (ConsumerTimeoutException cte) {

System.out.println("Consumer timed out");

}



Thanks.

-- 
-Ratha
http://vvratha.blogspot.com/

Re: kafka.consumer.ConsumerTimeoutException

Posted by Ratha v <vi...@gmail.com>.
Forgot to mention, I'm using kafka 2.11 version

On 24 March 2016 at 16:55, Ratha v <vi...@gmail.com> wrote:

> Hi all;
>
> I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
> to consume the messages,It continuously throwing timeoutexception..How can
> i get rid of this?
>
> I have multiple topics.
>
>
> *Executor*
>
>
> public class MessageListener {
>
> private Properties properties;
>
>
> private ConsumerConnector consumerConnector;
>
> private String topic;
>
> private ExecutorService executor;
>
>
> public MessageListener(String topic) {
>
> this.topic = topic;
>
>
> KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
>
> try {
>
> properties = confLoader.loadConsumerConfig();
>
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>
> consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
>
> } catch (FileNotFoundException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
>
> public void start(RawFile file) {
>
>
> Map<String, Integer> topicCountMap = new HashMap<>();
>
> topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
>
>
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector
>
> .createMessageStreams(topicCountMap);
>
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
> executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
>
>
> for (KafkaStream<byte[], byte[]> stream : streams) {
>
> executor.submit(new ListenerThread(stream));
>
>
> }
>
> }
>
>
>
> }
>
>
>
> *Thread*
>
> public class ListenerThread implements Runnable {
>
> private KafkaStream<byte[], byte[]> stream;;
>
>
> public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
>
> this.stream = msgStream;
>
>
> }
>
>
> @Override
>
> public void run() {
>
> try {
>
>
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
> while (it.hasNext()) {
>
> MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
>
> String topic = messageAndMetadata.topic();
>
> byte[] message = messageAndMetadata.message();
>
> System.out.println("111111111111111111111111111");
>
> FileProcessor processor = new FileProcessor();
>
> processor.processFile(topic, message);
>
> }
>
> } catch (ConsumerTimeoutException cte) {
>
> System.out.println("Consumer timed out");
>
> }
>
>
>
> Thanks.
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/