You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2018/03/28 20:19:00 UTC

[jira] [Comment Edited] (STORM-3013) Deactivated topology restarts if data flows into Kafka

    [ https://issues.apache.org/jira/browse/STORM-3013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418048#comment-16418048 ] 

Stig Rohde Døssing edited comment on STORM-3013 at 3/28/18 8:18 PM:
--------------------------------------------------------------------

[~Ajeesh]

Whoops, the issue is that we didn't expect Storm to ask the spout for metrics while the spout is deactivated. We close the consumer when the spout is deactivated. When Storm then asks for metrics, the spout tries to use the consumer to get e.g. how far behind the latest offset the consumer is, which throws this exception. 

I'll take a look at fixing this, unless you want to?


was (Author: srdo):
[~Ajeesh]

Whoops, the issue is that we didn't expect Storm to ask the spout for metrics while the spout is deactivated. When this happens, the spout tries to use the consumer to get e.g. how far behind the latest offset the consumer is, which throws this exception. 

I'll take a look at fixing this, unless you want to?

> Deactivated topology restarts if data flows into Kafka
> ------------------------------------------------------
>
>                 Key: STORM-3013
>                 URL: https://issues.apache.org/jira/browse/STORM-3013
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.2.1
>            Reporter: Ajeesh B
>            Priority: Major
>
> Hi, I have deactivated the storm topology & then if I produce any records into Kafka, Storm throws an exception. Exception follows,
> {code:java}
> 2018-03-28 09:50:23.804 o.a.s.d.executor Thread-83-kafkaLogs-executor[130 130] [INFO] Deactivating spout kafkaLogs:(130)
> 2018-03-28 09:51:01.289 o.a.s.util Thread-17-kafkaLogs-executor[139 139] [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:634) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
> Caused by: java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787) ~[stormjar.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:1622) ~[stormjar.jar:?]
> at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:79) ~[stormjar.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick$fn__4899.invoke(executor.clj:345) ~[storm-core-1.2.1.jar:1.2.1]
> at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
> at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
> at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
> at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?]
> at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) ~[clojure-1.7.0.jar:?]
> at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]
> at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.daemon.executor$fn__4975$tuple_action_fn__4981.invoke(executor.clj:522) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:471) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
> ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)