You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Han Jing <ha...@gmail.com> on 2018/03/27 09:14:27 UTC

答复: Storm kafka Spout Stuck When Kafka leader is Down

When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server 1.0.0. leader down, kafka spout went well.

But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as kafka sever version),kafka –server 1.0.0. Kafka spout stuck when kafka leader down.

 

Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client 1.0.0/1.0.1 ???

I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and kafka-client 1.0.0/1.0.1

发件人: Ajeesh [mailto:ajeeshreloaded@gmail.com] 
发送时间: 2018年3月27日 16:52
收件人: user@storm.apache.org
主题: Re: Storm kafka Spout Stuck When Kafka leader is Down

 

Hi, Use the storm-kafka-client version same as Kafka Server version 

 

On Tue, Mar 27, 2018, 2:04 PM Han Jing <hanjingjing1213@gmail.com <ma...@gmail.com> > wrote:

Hi All,

       I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0, 1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is stuck, there is no responseon UI website ,even kakfa topic leader is alter to another broker, It’s still stuck, until restart the kafka server progress. Storm recovered from struk.

       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client 1.0.0/1.0.1?

       

       Here’s some code and Storm log.Please help me with this issue.

Thanks a lot.

       --------------------------------------------------------------

Code:

Kafka-client version is the same the kafka version(1.0.0,1.0.1).

       Kafka is distribute on 3 brokers. There are 2 replicators  and 1 partition for every Kafka topic. 

       KafkaSpout configureateion is as below. The topology read from just one topic.

TopologyBuilder builder = new TopologyBuilder();
//kafka Servers IP
String bootstrapServers = properties.getProperty("bootstrap.servers");
//Kafka Spout consumer topic
String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
//KafkaSpout
KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
        .setProp(properties)
        .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
        .build();
//topology Spout,KAFKA_READER
builder.setSpout(BOLT_ID_KAFKA_READER, new KafkaSpout<>(config), 1);

       --------------------------------------------------------------

       Log:

       518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null), 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack:        null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])

518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1, disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null)

518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0, disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0, disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to node 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39 <http://170.0.0.39>  disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101]

518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101]

518745         at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-1.0.0.j       ar:?]

518746         at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]

518747         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]

518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?]

518750         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) [kafka-clients-1.0.0.ja       r:?]

518751         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) [kafka-clients-1.0.0.ja       r:?]

518752         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) [kafka-clients-1.0.0.ja       r:?]

518753         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) [kafka-cli       ents-1.0.0.jar:?]

518754         at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]

518755         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) [storm-kafka-client-1.2.1.jar:1.2.1]

518756         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) [storm-kafka-client-1.2.1.jar:1.2.1]

518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1]

518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) [storm-core-1.2.1.jar:1.2.1]

518759         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645 disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer clientId=consumer-1, groupId=matc       hgrp002] Connection to node 2147483645 could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId=       377898) with correlation id 377898 due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack:        null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])

518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets for partitions: [tradeMatch-0]

518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=matchg       rp002] Connection with /170.0.0.39 <http://170.0.0.39>  disconnected

518742 java.net.ConnectException: Connection refused

518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_101]

518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_101]

518745         at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-1.0.0.j       ar:?]

518746         at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]

518747         at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]

518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398) [kafka-clients-1.0.0.jar:?]

518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.0.jar:?]

518750         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) [kafka-clients-1.0.0.ja       r:?]

518751         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) [kafka-clients-1.0.0.ja       r:?]

518752         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) [kafka-clients-1.0.0.ja       r:?]

518753         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472) [kafka-cli       ents-1.0.0.jar:?]

518754         at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]

518755         at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464) [storm-kafka-client-1.2.1.jar:1.2.1]

518756         at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) [storm-kafka-client-1.2.1.jar:1.2.1]

518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) [storm-kafka-client-1.2.1.jar:1.2.1]

518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) [storm-core-1.2.1.jar:1.2.1]

518759         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]

518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]

518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645 disconnected.

518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer clientId=consumer-1, groupId=matc       hgrp002] Connection to node 2147483645 could not be established. Broker may not be available.

518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3, clientId=consumer-1, correlationId=       377898) with correlation id 377898 due to node 2147483645 being disconnected

518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack: null)

518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null), 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null), 170.0.0.46:9092 <http://170.0.0.46:9092>  (id: 3 rack:        null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])

518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0, disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null) dead

518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator request to broker 170.0.0.38:9092 <http://170.0.0.38:9092>  (id: 1 rack: null)

518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1, disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2 rack: null)))

518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator 170.0.0.39:9092 <http://170.0.0.39:9092>  (id: 2147483645 rack: null)

 


Re: 答复: Storm kafka Spout Stuck When Kafka leader is Down

Posted by Stig Rohde Døssing <sr...@apache.org>.
Thanks Jungtaek, that's probably it. Still, if storm-kafka-monitor is
hanging, I'd suspect an issue with either the Kafka cluster or the
consumer, what the monitor is doing is really simple.

2018-03-29 16:18 GMT+02:00 Jungtaek Lim <ka...@gmail.com>:

> Regarding UI stuck, I guess storm-kafka-monitor is being called while
> opening topology page, and being stuck waiting for response. We need to
> have timeout while executing storm-kafka-monitor and force shutting down
> storm-kafka-monitor process.
>
> 2018년 3월 29일 (목) 오전 12:43, Stig Rohde Døssing <sr...@apache.org>님이 작성:
>
>> Hi,
>>
>> I'm not aware of any incompatibility between storm-kafka-client and the
>> Kafka 1.0.0 consumer. I took a quick look through the Kafka upgrade notes
>> at https://kafka.apache.org/documentation/#upgrade, and don't see any
>> notice that the consumer should be used differently.
>>
>> Could you elaborate on what you mean by " Storm Kafka Spout is stuck,
>> there is no responseon UI website"? Kafka being down should have no
>> effect on whether Storm UI can load.
>>
>> I would also try asking on the kafka-users mailing list (
>> https://kafka.apache.org/contact), because it sounds like the consumer
>> isn't picking back up once the leader failover has happened, which isn't
>> something the spout has anything to do with.
>>
>> 2018-03-27 11:14 GMT+02:00 Han Jing <ha...@gmail.com>:
>>
>>> When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server
>>> 1.0.0. leader down, kafka spout went well.
>>>
>>> But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as
>>> kafka sever version),kafka –server 1.0.0. Kafka spout stuck when kafka
>>> leader down.
>>>
>>>
>>>
>>> *Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client
>>> 1.0.0/1.0.1 ???*
>>>
>>> *I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and
>>> kafka-client 1.0.0/1.0.1*
>>>
>>> *发件人:* Ajeesh [mailto:ajeeshreloaded@gmail.com]
>>> *发送时间:* 2018年3月27日 16:52
>>> *收件人:* user@storm.apache.org
>>> *主题:* Re: Storm kafka Spout Stuck When Kafka leader is Down
>>>
>>>
>>>
>>> Hi, Use the storm-kafka-client version same as Kafka Server version
>>>
>>>
>>>
>>> On Tue, Mar 27, 2018, 2:04 PM Han Jing <ha...@gmail.com>
>>> wrote:
>>>
>>> Hi All,
>>>
>>>        I’m using Storm-Kafka-Client 1.2.1 to read from Kafka
>>> sever(1.0.0, 1.0.1).When Kafka topic leader progress is down, Storm Kafka
>>> Spout is stuck, there is no responseon UI website ,even kakfa topic leader
>>> is alter to another broker, It’s still stuck, until restart the kafka
>>> server progress. Storm recovered from struk.
>>>
>>> *       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client
>>> 1.0.0/1.0.1?*
>>>
>>>
>>>
>>> *       Here’s some code and Storm log.Please help me with this issue.*
>>>
>>> *Thanks a lot.*
>>>
>>>        --------------------------------------------------------------
>>>
>>> *Code:*
>>>
>>> Kafka-client version is the same the kafka version(1.0.0,1.0.1).
>>>
>>>        Kafka is distribute on 3 brokers. There are 2 replicators  and 1
>>> partition for every Kafka topic.
>>>
>>>        KafkaSpout configureateion is as below. The topology read from
>>> just one topic.
>>>
>>> TopologyBuilder builder = new TopologyBuilder();
>>> //kafka Servers IP
>>> String bootstrapServers = properties.getProperty("bootstrap.servers");
>>> //Kafka Spout consumer topic
>>> String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
>>> //KafkaSpout
>>> KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
>>>         .setProp(properties)
>>>         .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.*EARLIEST*)
>>>         .build();
>>> //topology Spout,KAFKA_READER
>>> builder.setSpout(*BOLT_ID_KAFKA_READER*, new KafkaSpout<>(config), 1);
>>>
>>>        --------------------------------------------------------------
>>>
>>>        Log:
>>>
>>>        518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster
>>> metadata version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>>> 170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
>>> 170.0.0.38:9092 (id: 1 rack:        null)], partitions =
>>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>>> isr = [1,2], offlineReplicas = [])])
>>>
>>> 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>>> response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
>>> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
>>> apiVersion=1, clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe
>>> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
>>> 170.0.0.39:9092 (id: 2 rack: null)))
>>>
>>> 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>> 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>>
>>> 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>>> request to broker 170.0.0.46:9092 (id: 3 rack: null)
>>>
>>> 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>>> response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
>>> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
>>> apiVersion=1, clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe
>>> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
>>> 170.0.0.39:9092 (id: 2 rack: null)))
>>>
>>> 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>> 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>>
>>> 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>>
>>> 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>>> response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
>>> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
>>> apiVersion=1, clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe
>>> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
>>> 170.0.0.39:9092 (id: 2 rack: null)))
>>>
>>> 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>> 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to
>>> node 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
>>> for partitions: [tradeMatch-0]
>>>
>>> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=matchg       rp002] Connection with /
>>> 170.0.0.39 disconnected
>>>
>>> 518742 java.net.ConnectException: Connection refused
>>>
>>> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>>> Method) ~[?:1.8.0_101]
>>>
>>> 518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>> ~[?:1.8.0_101]
>>>
>>> 518745         at org.apache.kafka.common.network.
>>> PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>>> ~[kafka-clients-1.0.0.j       ar:?]
>>>
>>> 518746         at org.apache.kafka.common.network.KafkaChannel.
>>> finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]
>>>
>>> 518747         at org.apache.kafka.common.network.Selector.
>>> pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]
>>>
>>> 518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
>>> [kafka-clients-1.0.0.jar:?]
>>>
>>> 518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>>> [kafka-clients-1.0.0.jar:?]
>>>
>>> 518750         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518751         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518752         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518753         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
>>> [kafka-cli       ents-1.0.0.jar:?]
>>>
>>> 518754         at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]
>>>
>>> 518755         at org.apache.storm.kafka.spout.
>>> KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518756         at org.apache.storm.kafka.spout.KafkaSpout.
>>> emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>> [storm-core-1.2.1.jar:1.2.1]
>>>
>>> 518759         at org.apache.storm.util$async_
>>> loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>>>
>>> 518760         at clojure.lang.AFn.run(AFn.java:22)
>>> [clojure-1.7.0.jar:?]
>>>
>>> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>>>
>>> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
>>> <(214)%20748-3645> disconnected.
>>>
>>> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
>>> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
>>> 2147483645 <(214)%20748-3645> could not be established. Broker may not
>>> be available.
>>>
>>> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
>>> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
>>> clientId=consumer-1, correlationId=       377898) with correlation id
>>> 377898 due to node 2147483645 <(214)%20748-3645> being disconnected
>>>
>>> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>>
>>> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
>>> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id:
>>> 3 rack: null)
>>>
>>> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>>
>>> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster
>>> metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>>> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
>>> 170.0.0.46:9092 (id: 3 rack:        null)], partitions =
>>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>>> isr = [1,2], offlineReplicas = [])])
>>>
>>> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
>>> for partitions: [tradeMatch-0]
>>>
>>> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=matchg       rp002] Connection with /
>>> 170.0.0.39 disconnected
>>>
>>> 518742 java.net.ConnectException: Connection refused
>>>
>>> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>>> Method) ~[?:1.8.0_101]
>>>
>>> 518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>> ~[?:1.8.0_101]
>>>
>>> 518745         at org.apache.kafka.common.network.
>>> PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>>> ~[kafka-clients-1.0.0.j       ar:?]
>>>
>>> 518746         at org.apache.kafka.common.network.KafkaChannel.
>>> finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]
>>>
>>> 518747         at org.apache.kafka.common.network.Selector.
>>> pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]
>>>
>>> 518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
>>> [kafka-clients-1.0.0.jar:?]
>>>
>>> 518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>>> [kafka-clients-1.0.0.jar:?]
>>>
>>> 518750         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518751         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518752         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
>>> [kafka-clients-1.0.0.ja       r:?]
>>>
>>> 518753         at org.apache.kafka.clients.consumer.internals.
>>> ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
>>> [kafka-cli       ents-1.0.0.jar:?]
>>>
>>> 518754         at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]
>>>
>>> 518755         at org.apache.storm.kafka.spout.
>>> KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518756         at org.apache.storm.kafka.spout.KafkaSpout.
>>> emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>>
>>> 518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>> [storm-core-1.2.1.jar:1.2.1]
>>>
>>> 518759         at org.apache.storm.util$async_
>>> loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>>>
>>> 518760         at clojure.lang.AFn.run(AFn.java:22)
>>> [clojure-1.7.0.jar:?]
>>>
>>> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>>>
>>> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
>>> <(214)%20748-3645> disconnected.
>>>
>>> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
>>> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
>>> 2147483645 <(214)%20748-3645> could not be established. Broker may not
>>> be available.
>>>
>>> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
>>> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
>>> clientId=consumer-1, correlationId=       377898) with correlation id
>>> 377898 due to node 2147483645 <(214)%20748-3645> being disconnected
>>>
>>> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>>
>>> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
>>> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id:
>>> 3 rack: null)
>>>
>>> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>>
>>> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster
>>> metadata version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>>> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
>>> 170.0.0.46:9092 (id: 3 rack:        null)], partitions =
>>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>>> isr = [1,2], offlineReplicas = [])])
>>>
>>> 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>>> response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
>>> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
>>> apiVersion=1, clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe
>>> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
>>> 170.0.0.39:9092 (id: 2 rack: null)))
>>>
>>> 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>> 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>>
>>> 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>>
>>> 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>>> response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
>>> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
>>> apiVersion=1, clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe
>>> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
>>> 170.0.0.39:9092 (id: 2 rack: null)))
>>>
>>> 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
>>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>>
>>>
>>>
>>>
>>

Re: 答复: Storm kafka Spout Stuck When Kafka leader is Down

Posted by Jungtaek Lim <ka...@gmail.com>.
Regarding UI stuck, I guess storm-kafka-monitor is being called while
opening topology page, and being stuck waiting for response. We need to
have timeout while executing storm-kafka-monitor and force shutting down
storm-kafka-monitor process.

2018년 3월 29일 (목) 오전 12:43, Stig Rohde Døssing <sr...@apache.org>님이 작성:

> Hi,
>
> I'm not aware of any incompatibility between storm-kafka-client and the
> Kafka 1.0.0 consumer. I took a quick look through the Kafka upgrade notes
> at https://kafka.apache.org/documentation/#upgrade, and don't see any
> notice that the consumer should be used differently.
>
> Could you elaborate on what you mean by " Storm Kafka Spout is stuck,
> there is no responseon UI website"? Kafka being down should have no
> effect on whether Storm UI can load.
>
> I would also try asking on the kafka-users mailing list (
> https://kafka.apache.org/contact), because it sounds like the consumer
> isn't picking back up once the leader failover has happened, which isn't
> something the spout has anything to do with.
>
> 2018-03-27 11:14 GMT+02:00 Han Jing <ha...@gmail.com>:
>
>> When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server
>> 1.0.0. leader down, kafka spout went well.
>>
>> But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as
>> kafka sever version),kafka –server 1.0.0. Kafka spout stuck when kafka
>> leader down.
>>
>>
>>
>> *Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client
>> 1.0.0/1.0.1 ???*
>>
>> *I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and
>> kafka-client 1.0.0/1.0.1*
>>
>> *发件人:* Ajeesh [mailto:ajeeshreloaded@gmail.com]
>> *发送时间:* 2018年3月27日 16:52
>> *收件人:* user@storm.apache.org
>> *主题:* Re: Storm kafka Spout Stuck When Kafka leader is Down
>>
>>
>>
>> Hi, Use the storm-kafka-client version same as Kafka Server version
>>
>>
>>
>> On Tue, Mar 27, 2018, 2:04 PM Han Jing <ha...@gmail.com> wrote:
>>
>> Hi All,
>>
>>        I’m using Storm-Kafka-Client 1.2.1 to read from Kafka
>> sever(1.0.0, 1.0.1).When Kafka topic leader progress is down, Storm Kafka
>> Spout is stuck, there is no responseon UI website ,even kakfa topic leader
>> is alter to another broker, It’s still stuck, until restart the kafka
>> server progress. Storm recovered from struk.
>>
>> *       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client
>> 1.0.0/1.0.1?*
>>
>>
>>
>> *       Here’s some code and Storm log.Please help me with this issue.*
>>
>> *Thanks a lot.*
>>
>>        --------------------------------------------------------------
>>
>> *Code:*
>>
>> Kafka-client version is the same the kafka version(1.0.0,1.0.1).
>>
>>        Kafka is distribute on 3 brokers. There are 2 replicators  and 1
>> partition for every Kafka topic.
>>
>>        KafkaSpout configureateion is as below. The topology read from
>> just one topic.
>>
>> TopologyBuilder builder = new TopologyBuilder();
>> //kafka Servers IP
>> String bootstrapServers = properties.getProperty("bootstrap.servers");
>> //Kafka Spout consumer topic
>> String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
>> //KafkaSpout
>> KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
>>         .setProp(properties)
>>         .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.*EARLIEST*)
>>         .build();
>> //topology Spout,KAFKA_READER
>> builder.setSpout(*BOLT_ID_KAFKA_READER*, new KafkaSpout<>(config), 1);
>>
>>        --------------------------------------------------------------
>>
>>        Log:
>>
>>        518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
>> version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>> 170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
>> 170.0.0.38:9092 (id: 1 rack:        null)], partitions =
>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>> isr = [1,2], offlineReplicas = [])])
>>
>> 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>> response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
>> disconnected=false, requ
>> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
>> clientId=consumer-1, correlationId=377894),
>> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
>> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack:
>> null)))
>>
>> 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>> 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>
>> 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>> request to broker 170.0.0.46:9092 (id: 3 rack: null)
>>
>> 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>> response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
>> disconnected=false, requ
>> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
>> clientId=consumer-1, correlationId=377896),
>> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
>> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack:
>> null)))
>>
>> 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>> 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>
>> 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>
>> 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>> response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
>> disconnected=false, requ
>> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
>> clientId=consumer-1, correlationId=377897),
>> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
>> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack:
>> null)))
>>
>> 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>> 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to
>> node 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
>> for partitions: [tradeMatch-0]
>>
>> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=matchg       rp002] Connection with /
>> 170.0.0.39 disconnected
>>
>> 518742 java.net.ConnectException: Connection refused
>>
>> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>> Method) ~[?:1.8.0_101]
>>
>> 518744         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>> ~[?:1.8.0_101]
>>
>> 518745         at
>> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>> ~[kafka-clients-1.0.0.j       ar:?]
>>
>> 518746         at
>> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
>> ~[kafka-clients-1.0.0.jar:?]
>>
>> 518747         at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518748         at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:398)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518749         at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518750         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518751         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518752         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518753         at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
>> [kafka-cli       ents-1.0.0.jar:?]
>>
>> 518754         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518755         at
>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518756         at
>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518757         at
>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518758         at
>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>> [storm-core-1.2.1.jar:1.2.1]
>>
>> 518759         at
>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>> [storm-core-1.2.1.jar:1.2.1]
>>
>> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>
>> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>>
>> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
>> <(214)%20748-3645> disconnected.
>>
>> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
>> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
>> 2147483645 <(214)%20748-3645> could not be established. Broker may not
>> be available.
>>
>> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
>> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
>> clientId=consumer-1, correlationId=       377898) with correlation id
>> 377898 due to node 2147483645 <(214)%20748-3645> being disconnected
>>
>> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>
>> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
>> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
>> rack: null)
>>
>> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>
>> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
>> version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
>> 170.0.0.46:9092 (id: 3 rack:        null)], partitions =
>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>> isr = [1,2], offlineReplicas = [])])
>>
>> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
>> for partitions: [tradeMatch-0]
>>
>> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=matchg       rp002] Connection with /
>> 170.0.0.39 disconnected
>>
>> 518742 java.net.ConnectException: Connection refused
>>
>> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>> Method) ~[?:1.8.0_101]
>>
>> 518744         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>> ~[?:1.8.0_101]
>>
>> 518745         at
>> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>> ~[kafka-clients-1.0.0.j       ar:?]
>>
>> 518746         at
>> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
>> ~[kafka-clients-1.0.0.jar:?]
>>
>> 518747         at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518748         at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:398)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518749         at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518750         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518751         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518752         at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
>> [kafka-clients-1.0.0.ja       r:?]
>>
>> 518753         at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
>> [kafka-cli       ents-1.0.0.jar:?]
>>
>> 518754         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1441)
>> [kafka-clients-1.0.0.jar:?]
>>
>> 518755         at
>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518756         at
>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518757         at
>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>> [storm-kafka-client-1.2.1.jar:1.2.1]
>>
>> 518758         at
>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>> [storm-core-1.2.1.jar:1.2.1]
>>
>> 518759         at
>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>> [storm-core-1.2.1.jar:1.2.1]
>>
>> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>
>> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>>
>> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
>> <(214)%20748-3645> disconnected.
>>
>> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
>> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
>> 2147483645 <(214)%20748-3645> could not be established. Broker may not
>> be available.
>>
>> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
>> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
>> clientId=consumer-1, correlationId=       377898) with correlation id
>> 377898 due to node 2147483645 <(214)%20748-3645> being disconnected
>>
>> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>
>> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
>> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
>> rack: null)
>>
>> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>
>> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
>> version 7 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
>> 170.0.0.39:9092 (id: 2 rack: null), 170.0.0.38:9092 (id: 1 rack: null),
>> 170.0.0.46:9092 (id: 3 rack:        null)], partitions =
>> [Partition(topic = tradeMatch, partition = 0, leader = 2, replicas = [2,1],
>> isr = [1,2], offlineReplicas = [])])
>>
>> 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>> response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
>> disconnected=false, requ
>> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
>> clientId=consumer-1, correlationId=377900),
>> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
>> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack:
>> null)))
>>
>> 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>> 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null) dead
>>
>> 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
>> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>>
>> 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
>> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
>> response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
>> disconnected=false, requ
>> estHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1,
>> clientId=consumer-1, correlationId=377901),
>> responseBody=FindCoordinatorRe       sponse(throttleTimeMs=0,
>> errorMessage='null', error=NONE, node=170.0.0.39:9092 (id: 2 rack:
>> null)))
>>
>> 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
>> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
>> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
>> 170.0.0.39:9092 (id: 2147483645 <(214)%20748-3645> rack: null)
>>
>>
>>
>>
>

Re: 答复: Storm kafka Spout Stuck When Kafka leader is Down

Posted by Stig Rohde Døssing <sr...@apache.org>.
Hi,

I'm not aware of any incompatibility between storm-kafka-client and the
Kafka 1.0.0 consumer. I took a quick look through the Kafka upgrade notes
at https://kafka.apache.org/documentation/#upgrade, and don't see any
notice that the consumer should be used differently.

Could you elaborate on what you mean by " Storm Kafka Spout is stuck, there
is no responseon UI website"? Kafka being down should have no effect on
whether Storm UI can load.

I would also try asking on the kafka-users mailing list (
https://kafka.apache.org/contact), because it sounds like the consumer
isn't picking back up once the leader failover has happened, which isn't
something the spout has anything to do with.

2018-03-27 11:14 GMT+02:00 Han Jing <ha...@gmail.com>:

> When I use Storm-Kafka-Client 1.2.1, kafka-client 0.10.2.1, kafak-server
> 1.0.0. leader down, kafka spout went well.
>
> But When I use Storm-kafka-client 1.2.1,kafka-client1.0.0 (the same as
> kafka sever version),kafka –server 1.0.0. Kafka spout stuck when kafka
> leader down.
>
>
>
> *Is Storm-Kafka-Client 1.2.1 really compatible with kafka-client
> 1.0.0/1.0.1 ???*
>
> *I guess there’re some version issue with kafka-client 1.0.0/1.0.1 and
> kafka-client 1.0.0/1.0.1*
>
> *发件人:* Ajeesh [mailto:ajeeshreloaded@gmail.com]
> *发送时间:* 2018年3月27日 16:52
> *收件人:* user@storm.apache.org
> *主题:* Re: Storm kafka Spout Stuck When Kafka leader is Down
>
>
>
> Hi, Use the storm-kafka-client version same as Kafka Server version
>
>
>
> On Tue, Mar 27, 2018, 2:04 PM Han Jing <ha...@gmail.com> wrote:
>
> Hi All,
>
>        I’m using Storm-Kafka-Client 1.2.1 to read from Kafka sever(1.0.0,
> 1.0.1).When Kafka topic leader progress is down, Storm Kafka Spout is
> stuck, there is no responseon UI website ,even kakfa topic leader is alter
> to another broker, It’s still stuck, until restart the kafka server
> progress. Storm recovered from struk.
>
> *       Is Storm-Kafka-Client 1.2.1 compatible with kafka-client
> 1.0.0/1.0.1?*
>
>
>
> *       Here’s some code and Storm log.Please help me with this issue.*
>
> *Thanks a lot.*
>
>        --------------------------------------------------------------
>
> *Code:*
>
> Kafka-client version is the same the kafka version(1.0.0,1.0.1).
>
>        Kafka is distribute on 3 brokers. There are 2 replicators  and 1
> partition for every Kafka topic.
>
>        KafkaSpout configureateion is as below. The topology read from just
> one topic.
>
> TopologyBuilder builder = new TopologyBuilder();
> //kafka Servers IP
> String bootstrapServers = properties.getProperty("bootstrap.servers");
> //Kafka Spout consumer topic
> String kafkaReaderTopic = properties.getProperty("storm.kafka.reader.topic");
> //KafkaSpout
> KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder(bootstrapServers, kafkaReaderTopic)
>         .setProp(properties)
>         .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.*EARLIEST*)
>         .build();
> //topology Spout,KAFKA_READER
> builder.setSpout(*BOLT_ID_KAFKA_READER*, new KafkaSpout<>(config), 1);
>
>        --------------------------------------------------------------
>
>        Log:
>
>        518728 2018-03-26 22:42:25.023 o.a.k.c.Metadata
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] Updated cluster metadata
> version 6 to Cluster(i       d = 0u-o-B0qTBGm7pCQ550QBw, nodes = [
> 170.0.0.46:9092 (id: 3 rack: null), 170.0.0.39:9092 (id: 2 rack: null),
> 170.0.0.38:9092 (id: 1 rack:        null)], partitions = [Partition(topic
> = tradeMatch, partition = 0, leader = 2, replicas = [2,1], isr = [1,2],
> offlineReplicas = [])])
>
> 518729 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345023, latencyMs=1,
> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
> apiVersion=1, clientId=consumer-1, correlationId=377894), responseBody=FindCoordinatorRe
> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
> 170.0.0.39:9092 (id: 2 rack: null)))
>
> 518730 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518731 2018-03-26 22:42:25.024 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518732 2018-03-26 22:42:25.124 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.46:9092 (id: 3 rack: null)
>
> 518733 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345124, latencyMs=0,
> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
> apiVersion=1, clientId=consumer-1, correlationId=377896), responseBody=FindCoordinatorRe
> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
> 170.0.0.39:9092 (id: 2 rack: null)))
>
> 518734 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518735 2018-03-26 22:42:25.125 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518736 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518737 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345225, latencyMs=0,
> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
> apiVersion=1, clientId=consumer-1, correlationId=377897), responseBody=FindCoordinatorRe
> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
> 170.0.0.39:9092 (id: 2 rack: null)))
>
> 518738 2018-03-26 22:42:25.225 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518739 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Initiating connection to
> node 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
> for partitions: [tradeMatch-0]
>
> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=matchg       rp002] Connection with /
> 170.0.0.39 disconnected
>
> 518742 java.net.ConnectException: Connection refused
>
> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
> Method) ~[?:1.8.0_101]
>
> 518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_101]
>
> 518745         at org.apache.kafka.common.network.PlaintextTransportLayer.
> finishConnect(PlaintextTransportLayer.java:50)
> ~[kafka-clients-1.0.0.j       ar:?]
>
> 518746         at org.apache.kafka.common.network.KafkaChannel.
> finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]
>
> 518747         at org.apache.kafka.common.network.Selector.
> pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]
>
> 518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
> [kafka-clients-1.0.0.jar:?]
>
> 518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> [kafka-clients-1.0.0.jar:?]
>
> 518750         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518751         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518752         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518753         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
> [kafka-cli       ents-1.0.0.jar:?]
>
> 518754         at org.apache.kafka.clients.consumer.KafkaConsumer.
> committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]
>
> 518755         at org.apache.storm.kafka.spout.
> KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518756         at org.apache.storm.kafka.spout.KafkaSpout.
> emitIfWaitingNotEmitted(KafkaSpout.java:440)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518759         at org.apache.storm.util$async_
> loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>
> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>
> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
> disconnected.
>
> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
> 2147483645 could not be established. Broker may not be available.
>
> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
> clientId=consumer-1, correlationId=       377898) with correlation id
> 377898 due to node 2147483645 being disconnected
>
> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
> rack: null)
>
> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9
> 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i       d =
> 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 (id: 2 rack: null),
> 170.0.0.38:9092 (id: 1 rack: null), 170.0.0.46:9092 (id: 3 rack:
> null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader =
> 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])
>
> 518740 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Fetching committed offsets
> for partitions: [tradeMatch-0]
>
> 518741 2018-03-26 22:42:25.226 o.a.k.c.n.Selector
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=matchg       rp002] Connection with /
> 170.0.0.39 disconnected
>
> 518742 java.net.ConnectException: Connection refused
>
> 518743         at sun.nio.ch.SocketChannelImpl.checkConnect(Native
> Method) ~[?:1.8.0_101]
>
> 518744         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[?:1.8.0_101]
>
> 518745         at org.apache.kafka.common.network.PlaintextTransportLayer.
> finishConnect(PlaintextTransportLayer.java:50)
> ~[kafka-clients-1.0.0.j       ar:?]
>
> 518746         at org.apache.kafka.common.network.KafkaChannel.
> finishConnect(KafkaChannel.java:106) ~[kafka-clients-1.0.0.jar:?]
>
> 518747         at org.apache.kafka.common.network.Selector.
> pollSelectionKeys(Selector.java:444) [kafka-clients-1.0.0.jar:?]
>
> 518748         at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
> [kafka-clients-1.0.0.jar:?]
>
> 518749         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> [kafka-clients-1.0.0.jar:?]
>
> 518750         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518751         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518752         at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> [kafka-clients-1.0.0.ja       r:?]
>
> 518753         at org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:472)
> [kafka-cli       ents-1.0.0.jar:?]
>
> 518754         at org.apache.kafka.clients.consumer.KafkaConsumer.
> committed(KafkaConsumer.java:1441) [kafka-clients-1.0.0.jar:?]
>
> 518755         at org.apache.storm.kafka.spout.
> KafkaSpout.emitOrRetryTuple(KafkaSpout.java:464)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518756         at org.apache.storm.kafka.spout.KafkaSpout.
> emitIfWaitingNotEmitted(KafkaSpout.java:440)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518757         at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
> [storm-kafka-client-1.2.1.jar:1.2.1]
>
> 518758         at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
> [storm-core-1.2.1.jar:1.2.1]
>
> 518759         at org.apache.storm.util$async_
> loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>
> 518760         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> 518761         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
>
> 518762 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Node 2147483645
> disconnected.
>
> 518763 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [WARN] [Consumer
> clientId=consumer-1, groupId=matc       hgrp002] Connection to node
> 2147483645 could not be established. Broker may not be available.
>
> 518764 2018-03-26 22:42:25.226 o.a.k.c.c.i.ConsumerNetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1,        groupId=matchgrp002] Cancelled OFFSET_FETCH
> request RequestHeader(apiKey=OFFSET_FETCH, apiVersion=3,
> clientId=consumer-1, correlationId=       377898) with correlation id
> 377898 due to node 2147483645 being disconnected
>
> 518765 2018-03-26 22:42:25.226 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518766 2018-03-26 22:42:25.226 o.a.k.c.NetworkClient
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, groupId=mat       chgrp002] Sending metadata request
> (type=MetadataRequest, topics=tradeMatch) to node 170.0.0.46:9092 (id: 3
> rack: null)
>
> 518767 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518768 2018-03-26 22:42:25.327 o.a.k.c.Metadata Thread-11-bolt-KafkaReader-executor[9
> 9] [DEBUG] Updated cluster metadata version 7 to Cluster(i       d =
> 0u-o-B0qTBGm7pCQ550QBw, nodes = [170.0.0.39:9092 (id: 2 rack: null),
> 170.0.0.38:9092 (id: 1 rack: null), 170.0.0.46:9092 (id: 3 rack:
> null)], partitions = [Partition(topic = tradeMatch, partition = 0, leader =
> 2, replicas = [2,1], isr = [1,2], offlineReplicas = [])])
>
> 518769 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345327, latencyMs=0,
> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
> apiVersion=1, clientId=consumer-1, correlationId=377900), responseBody=FindCoordinatorRe
> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
> 170.0.0.39:9092 (id: 2 rack: null)))
>
> 518770 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
> 518771 2018-03-26 22:42:25.327 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Marking the coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null) dead
>
> 518772 2018-03-26 22:42:25.427 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Sending GroupCoordinator
> request to broker 170.0.0.38:9092 (id: 1 rack: null)
>
> 518773 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [DEBUG] [Consumer
> clientId=consumer-1, g       roupId=matchgrp002] Received GroupCoordinator
> response ClientResponse(receivedTimeMs=1522075345428, latencyMs=1,
> disconnected=false, requ       estHeader=RequestHeader(apiKey=FIND_COORDINATOR,
> apiVersion=1, clientId=consumer-1, correlationId=377901), responseBody=FindCoordinatorRe
> sponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=
> 170.0.0.39:9092 (id: 2 rack: null)))
>
> 518774 2018-03-26 22:42:25.428 o.a.k.c.c.i.AbstractCoordinator
> Thread-11-bolt-KafkaReader-executor[9 9] [INFO] [Consumer
> clientId=consumer-1, gr       oupId=matchgrp002] Discovered coordinator
> 170.0.0.39:9092 (id: 2147483645 rack: null)
>
>
>
>