You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tech Bolek <te...@yahoo.com.INVALID> on 2016/02/24 03:14:16 UTC

Kafka consumer becomes deaf after an hour or so

I got a consumer using high-level API based on the example from here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleWorks fine but after an hour or so of inactivity it stops responding to new messages. All I see the log is:

INFO  [2016-02-23 17:06:10,070] org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 4002ms for sessionid 0x152b35436a9003d, closing socket connection and attempting reconnectINFO  [2016-02-23 17:06:10,173] org.I0Itec.zkclient.ZkClient: zookeeper state changed (Disconnected)INFO  [2016-02-23 17:06:11,223] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)INFO  [2016-02-23 17:06:11,225] org.apache.zookeeper.ClientCnxn: Socket connection established to localhost/127.0.0.1:2181, initiating sessionINFO  [2016-02-23 17:06:11,231] org.apache.zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x152b35436a9003d, negotiated timeout = 6000 INFO  [2016-02-23 17:06:11,231] org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected)




Init code: 
 Properties props = new Properties(); 
 props.put("zookeeper.connect", "localhost:2181"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200");
 props.put("group.id", "ONEGROUP"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes",   "200000000"); props.put("max.partition.fetch.bytes", "200000000");  ConsumerConfig cc = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(cc);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put("LISTENER1", new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);        ExecutorService threadExecutor = Executors.newFixedThreadPool(1);
 
 int threadNumber = 0;
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("LISTENER1"); for (final KafkaStream stream : streams) {
      threadExecutor.submit(new ProcessorThread(stream, threadNumber));       threadNumber++;  }

ProcessorThread:
class ProcessorThread implements Runnable {
     private final KafkaStream stream;    private final int threadNumber;
  public ProcessorThread(KafkaStream stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; }  public void run() {
 ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) {  MessageAndMetadata<byte[], byte[]> msg = it.next(); byte[] bytes = msg.message(); String msgString = new String(bytes); /* process message *. }
 }