You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Mitchell Rathbun (BLOOMBERG/ 731 LEX)" <mr...@bloomberg.net> on 2018/03/28 17:45:19 UTC

Issue killing KafkaSpout with storm-kafka-client 1.1.1

We are using the KafkaSpout class provided by version 1.1.1 of storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0 of kafka-clients. In local mode, we start our topology using LocalCluster's submitTopology method, and bring down the topology by calling the killTopology method followed by the shutdown method. Every time killTopology is run, the following occurs:

ERROR Slot [SLOT_1024] - Error when processing event
java.lang.IllegalStateException: This consumer has already been closed.
        at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1416) ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
  at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360) ~[Engine-0.0.1-SNAPSHOT.jar:?]
    at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485) ~[Engine-0.0.1-SNAPSHOT.jar:?]
     at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472) ~[Engine-0.0.1-SNAPSHOT.jar:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_162]
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855) ~[storm-core-1.1.1.jar:1.1.1]
 at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
     at org.apache.storm.daemon.executor$mk_executor$reify__4901.shutdown(executor.clj:425) ~[storm-core-1.1.1.jar:1.1.1]
        at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
     at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.wor ... truncated

I did notice that updating just the version of kafka-clients to 1.0.1 made this issue disappear. Also, this issue only happens in local mode, not cluster mode. Is there something wrong with how we are bringing down the topology using LocalCluster? Or is this a known issue with version 1.1.1 of storm-kafka-client?



Re: Issue killing KafkaSpout with storm-kafka-client 1.1.1

Posted by Stig Rohde Døssing <sr...@apache.org>.
Looks like a bug in the consumer
https://github.com/apache/kafka/commit/031da889bc811200da67568c5779760dcb006238.
The spout closes the consumer both when the topology is deactivated, and
when the spout is closed. For consumers in pre-1.0.0 versions the consumer
close method happened to not be idempotent. I believe both deactivate and
close are called when a local cluster is shut down. When a regular worker
is shut down, I think the JVM running it is just killed, so close doesn't
get called on the spout.

If you need this fixed for older consumer versions, we could null the
consumer reference after closing it, so we don't close it more than once.

2018-03-28 19:45 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <
mrathbun1@bloomberg.net>:

> We are using the KafkaSpout class provided by version 1.1.1 of
> storm-kafka-client, along with version 1.1.1 of Storm and version 0.10.0.0
> of kafka-clients. In local mode, we start our topology using LocalCluster's
> submitTopology method, and bring down the topology by calling the
> killTopology method followed by the shutdown method. Every time
> killTopology is run, the following occurs:
>
> ERROR Slot [SLOT_1024] - Error when processing event
> java.lang.IllegalStateException: This consumer has already been closed.
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> ensureNotClosed(KafkaConsumer.java:1416) ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1427) ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.shutdown(KafkaSpout.java:485)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at org.apache.storm.kafka.spout.KafkaSpout.close(KafkaSpout.java:472)
> ~[Engine-0.0.1-SNAPSHOT.jar:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_162]
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62) ~[?:1.8.0_162]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$fn__5104.invoke(executor.clj:855)
> ~[storm-core-1.1.1.jar:1.1.1]
> at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.executor$mk_executor$reify__
> 4901.shutdown(executor.clj:425) ~[storm-core-1.1.1.jar:1.1.1]
> at sun.reflect.GeneratedMethodAccessor128.invoke(Unknown Source) ~[?:?]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_162]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_162]
> at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
> ~[clojure-1.7.0.jar:?]
> at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313)
> ~[clojure-1.7.0.jar:?]
> at org.apache.storm.daemon.wor ... truncated
>
> I did notice that updating just the version of kafka-clients to 1.0.1 made
> this issue disappear. Also, this issue only happens in local mode, not
> cluster mode. Is there something wrong with how we are bringing down the
> topology using LocalCluster? Or is this a known issue with version 1.1.1 of
> storm-kafka-client?
>
>
>