You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Felipe Cavalcanti (JIRA)" <ji...@apache.org> on 2017/02/03 20:08:51 UTC
[jira] [Created] (STORM-2342) storm-kafka-client consumer group
getting stuck consuming from kafka 0.10.1.1
Felipe Cavalcanti created STORM-2342:
----------------------------------------
Summary: storm-kafka-client consumer group getting stuck consuming from kafka 0.10.1.1
Key: STORM-2342
URL: https://issues.apache.org/jira/browse/STORM-2342
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Affects Versions: 1.0.2
Reporter: Felipe Cavalcanti
I've created a topology that will read from kafka using storm-kafka-client but when it reaches the last message on kafka log it stops consuming and get stuck, new messages are never consumed, kafka logs that:
[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Preparing to restabilize group kafka-spout with old generation 29 (kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,865] INFO [GroupCoordinator 1002]: Stabilized group kafka-spout generation 30 (kafka.coordinator.GroupCoordinator)
[2017-02-03 19:15:26,868] INFO [GroupCoordinator 1002]: Assignment received from leader for group kafka-spout for generation 30 (kafka.coordinator.GroupCoordinator)
========= here storm starts consuming messages, then, when it hits the last message, I can see this log in kafka == >
[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Preparing to restabilize group kafka-spout with old generation 30 (kafka.coordinator.GroupCoordinator)
[2017-02-03 19:16:01,266] INFO [GroupCoordinator 1002]: Group kafka-spout with generation 31 is now empty (kafka.coordinator.GroupCoordinator)
=====
and then storm consumer group is stuck, no new messages are read from kafka. my topology/ spout are configured that way:
Topology:
c.put(SConfig.TOPOLOGY_MAX_SPOUT_PENDING, 1000)
c.put(SConfig.NIMBUS_SEEDS, "my nimbus seeds")
c.put(SConfig.NIMBUS_THRIFT_PORT, 6627)
c.put(SConfig.TOPOLOGY_WORKERS, 2) c.put(SConfig.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100)
Spout:
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, true)
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, ...)
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafka-spout")
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, keyDeserializer)
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, valueDeserializer)
any hints?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)