You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Tech Bolek <te...@yahoo.com.INVALID> on 2016/02/24 03:14:16 UTC
Kafka consumer becomes deaf after an hour or so
I got a consumer using high-level API based on the example from here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleWorks fine but after an hour or so of inactivity it stops responding to new messages. All I see the log is:
INFO [2016-02-23 17:06:10,070] org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 4002ms for sessionid 0x152b35436a9003d, closing socket connection and attempting reconnectINFO [2016-02-23 17:06:10,173] org.I0Itec.zkclient.ZkClient: zookeeper state changed (Disconnected)INFO [2016-02-23 17:06:11,223] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)INFO [2016-02-23 17:06:11,225] org.apache.zookeeper.ClientCnxn: Socket connection established to localhost/127.0.0.1:2181, initiating sessionINFO [2016-02-23 17:06:11,231] org.apache.zookeeper.ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x152b35436a9003d, negotiated timeout = 6000 INFO [2016-02-23 17:06:11,231] org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected)
Init code:
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200");
props.put("group.id", "ONEGROUP"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "200000000"); props.put("max.partition.fetch.bytes", "200000000"); ConsumerConfig cc = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(cc);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put("LISTENER1", new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); ExecutorService threadExecutor = Executors.newFixedThreadPool(1);
int threadNumber = 0;
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("LISTENER1"); for (final KafkaStream stream : streams) {
threadExecutor.submit(new ProcessorThread(stream, threadNumber)); threadNumber++; }
ProcessorThread:
class ProcessorThread implements Runnable {
private final KafkaStream stream; private final int threadNumber;
public ProcessorThread(KafkaStream stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; } public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) { MessageAndMetadata<byte[], byte[]> msg = it.next(); byte[] bytes = msg.message(); String msgString = new String(bytes); /* process message *. }
}