You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2017/07/03 15:10:00 UTC

[jira] [Updated] (STORM-2440) Kafka outage can lead to lockup of topology

     [ https://issues.apache.org/jira/browse/STORM-2440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim updated STORM-2440:
--------------------------------
    Fix Version/s: 1.2.0

> Kafka outage can lead to lockup of topology
> -------------------------------------------
>
>                 Key: STORM-2440
>                 URL: https://issues.apache.org/jira/browse/STORM-2440
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core, storm-kafka
>    Affects Versions: 0.10.1, 1.0.1, 1.0.2, 1.1.0
>            Reporter: Nico Meyer
>            Assignee: Nico Meyer
>             Fix For: 1.0.4, 1.1.1, 1.2.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> During two somewhat extended outages of our Kafka cluster, we experienced a problem with our Storm topologies consuming data from that Kafka cluster.
> Almost all our topologies just silently stopped processing data from some of the topics/partitions, an the only way to fix this situation was to restart those topologies.
> I tracked down one occurrence of the failure to this worker, which was running one the KafkaSpouts:
> {noformat}
> 2017-03-18 04:06:15.389 o.a.s.k.KafkaUtils [ERROR] Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
> 2017-03-18 04:06:15.389 o.a.s.k.KafkaSpout [WARN] Fetch failed
> org.apache.storm.kafka.FailedFetchException: Error fetching data from [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}] for topic [tagging_log]: [NOT_LEADER_FOR_PARTITION]
>         at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:213) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> 2017-03-18 04:06:15.390 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
> 2017-03-18 04:06:15.395 o.a.s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=tagging_log, partitionMap={0=kafka-03:9092, 1=kafka-12:9092,
>  2=kafka-08:9092, 3=kafka-05:9092}}
> 2017-03-18 04:06:15.395 o.a.s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=kafka-03:9092, topic=tagging_log, partition=0}, Partition{host=kafka-12:9092, topic=tagging_log, partit
> ion=1}, Partition{host=kafka-08:9092, topic=tagging_log, partition=2}, Partition{host=kafka-05:9092, topic=tagging_log, partition=3}]
> 2017-03-18 04:06:15.395 o.a.s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: [Partition{host=kafka-08:9092, topic=tagging_log, partition=1}]
> 2017-03-18 04:06:15.396 o.a.s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=kafka-12:9092, topic=tagging_log, partition=1}]
> 2017-03-18 04:06:15.398 o.a.s.k.PartitionManager [INFO] Read partition information from: /log_processing/tagging/kafka-tagging-spout/partition_1  --> {"partition":1,"off
> set":40567174332,"topology":{"name":"tagging-aerospike-1","id":"tagging-aerospike-1-3-1489587827"},"topic":"tagging_log","broker":{"port":9092,"host":"kafka-08"}}
> 2017-03-18 04:06:25.408 k.c.SimpleConsumer [INFO] Reconnect due to error:
> java.net.SocketTimeoutException
>         at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86) [stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [stormjar.jar:?]
>         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [stormjar.jar:?]
>         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) [stormjar.jar:?]
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) [stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> 2017-03-18 04:06:35.416 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.net.SocketTimeoutException
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.net.SocketTimeoutException
>         at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
>         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
>         ... 5 more
> 2017-03-18 04:06:35.419 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.net.SocketTimeoutException
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:144) ~[stormjar.jar:?]
>         at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
>         at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.net.SocketTimeoutException
>         at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_121]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_121]
>         at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_121]
>         at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
>         at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[stormjar.jar:?]
>         at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:?]
>         ... 5 more
> 2017-03-18 04:06:35.442 o.a.s.d.executor [INFO] Got interrupted excpetion shutting thread down...
> {noformat}
> There were no more outputs in the log after that until the toplogy was manually killed.
> As you can see the {{java.net.SocketTimeoutException}} escapes the storm-kafka code (probably a problem in and of itself), but the worker is not killed. The thread that calls the {{.nextTuple}} method of the spout is exited on the other hand.
> This is the culprit line: https://github.com/apache/storm/blob/v1.1.0/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L270
> I see that this has been fixed in the Java port of the executor code by explicitly excluding {{java.net.SocketTimeoutException}} from the condition.
> I will open a pull request with a backport tomorrow.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)