You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Alexandre Vermeerbergen <av...@gmail.com> on 2018/11/12 20:56:38 UTC

NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

Hello,

Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
(storm-core & storm-kafka-client) taken from same Git, we get the
following crash coming from a NullPointerException in
KafkaOffsetMetric.getValueAndReset :

2018-11-12 19:31:30.496 o.a.s.util
Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

          at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

          at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]

Caused by: java.lang.NullPointerException

          at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
~[stormjar.jar:?]

          at org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at clojure.core$map$fn__4553.invoke(core.clj:2622)
~[clojure-1.7.0.jar:?]

          at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]

          at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]

          at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]

          at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]

          at clojure.core$filter$fn__4580.invoke(core.clj:2679)
~[clojure-1.7.0.jar:?]

          at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]

          at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]

          at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]

          at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]

          at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]

          at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
~[clojure-1.7.0.jar:?]

          at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
~[clojure-1.7.0.jar:?]

          at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
~[clojure-1.7.0.jar:?]

          at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
~[clojure-1.7.0.jar:?]

          at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
~[clojure-1.7.0.jar:?]

          at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]

          at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]

          at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

          ... 7 more


In source code, the null pointer exception comes from the following
line of KafkaOffsetMetric.java:

long earliestTimeOffset = beginningOffsets.get(topicPartition);

The NullPointerException causes the crash of the worker process
hosting the Spout, which leads to countless Netty error messages until
the Spout is restaured on another worker.

Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
getting these crashes consumes a very high volume of data on a Kafka
topic having 16 partitions.
All this running with ORACLE Java 8 update 192 on CentOS 7.

Any idea why beginningOffsets could be null ?

Kind regards,
Alexandre Vermeerbergen

Re: NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

Posted by Stig Rohde Døssing <st...@gmail.com>.
It looks like KAFKA-7044 affects 1.1.0 and up, so people on earlier
versions aren't affected. I think we should either make a work around for
the issue by skipping the metrics if the bug occurs, or add a link to
KAFKA-7044 to the documentation.

Den man. 19. nov. 2018 kl. 09.42 skrev Alexandre Vermeerbergen <
avermeerbergen@gmail.com>:

> Hello Stig,
>
> Thank you very much for your answer : I have tested our many
> topologies using Kafka Client 2.0.1 instead of Kafka Client 2.0.0, let
> them run at full charge for couple of days, and I can confirm that
> this exception no longer occurs !
>
> May I suggest storm-kafka-client documentation to mention that this
> version of Kafka Client is recommended ?
>
> Kind regards,
> Alexandre Vermeerbergen
>
> Le lun. 12 nov. 2018 à 23:17, Stig Rohde Døssing
> <st...@gmail.com> a écrit :
> >
> > I don't think beginningOffsets is null. I think it's missing one of the
> > partitions, which would mean the right hand side of the line is null,
> which
> > gives an NPE when we try to assign it to a primitive long.
> >
> > I think this could be due to
> > https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit
> > message for the fix
> >
> https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0
> .
> > Specifically part 2 sounds a lot like what I think might be happening
> here.
> >
> > "
> >
> > `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
> > assumed that endOffsets()/beginningOffsets() which eventually call
> > Fetcher.fetchOffsetsByTimes(), would return a map with all the topic
> > partitions passed to endOffsets()/beginningOffsets() and that values
> > are not null. Because of (1), null values were possible if some of the
> > topic partitions were already known (in metadata cache) and some not
> > (metadata cache did not have entries for some of the topic
> > partitions). However, even with fixing (1),
> > endOffsets()/beginningOffsets() may return a map with some topic
> > partitions missing, when list offset request returns a non-retriable
> > error.
> >
> > "
> >
> > Basically KafkaOffsetMetric also assumes that when
> beginningOffsets(topics)
> > is called, the returned map will contain a value for all requested
> topics.
> > Could you try upgrading to Kafka 2.0.1?
> >
> > If necessary we can also work around this on the Storm side by skipping
> the
> > metrics if the requested partition isn't in the return values for
> > beginningOffsets/endOffsets. Feel free to raise an issue for this.
> >
> > Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen <
> > avermeerbergen@gmail.com>:
> >
> > > Hello,
> > >
> > > Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
> > > (storm-core & storm-kafka-client) taken from same Git, we get the
> > > following crash coming from a NullPointerException in
> > > KafkaOffsetMetric.getValueAndReset :
> > >
> > > 2018-11-12 19:31:30.496 o.a.s.util
> > > Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!
> > >
> > > java.lang.RuntimeException: java.lang.NullPointerException
> > >
> > >           at
> > >
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > > org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
> > > [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> > >
> > >           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
> > >
> > > Caused by: java.lang.NullPointerException
> > >
> > >           at
> > >
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
> > > ~[stormjar.jar:?]
> > >
> > >           at
> > >
> org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at clojure.core$map$fn__4553.invoke(core.clj:2622)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core$seq__4128.invoke(core.clj:137)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.Cons.next(Cons.java:39)
> ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core$next__4112.invoke(core.clj:64)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at
> > > clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at
> > > clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core$reduce.invoke(core.clj:6519)
> > > ~[clojure-1.7.0.jar:?]
> > >
> > >           at clojure.core$into.invoke(core.clj:6600)
> ~[clojure-1.7.0.jar:?]
> > >
> > >           at
> > > org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           at
> > >
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> > > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> > >
> > >           ... 7 more
> > >
> > >
> > > In source code, the null pointer exception comes from the following
> > > line of KafkaOffsetMetric.java:
> > >
> > > long earliestTimeOffset = beginningOffsets.get(topicPartition);
> > >
> > > The NullPointerException causes the crash of the worker process
> > > hosting the Spout, which leads to countless Netty error messages until
> > > the Spout is restaured on another worker.
> > >
> > > Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
> > > Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
> > > getting these crashes consumes a very high volume of data on a Kafka
> > > topic having 16 partitions.
> > > All this running with ORACLE Java 8 update 192 on CentOS 7.
> > >
> > > Any idea why beginningOffsets could be null ?
> > >
> > > Kind regards,
> > > Alexandre Vermeerbergen
> > >
>

Re: NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

Posted by Alexandre Vermeerbergen <av...@gmail.com>.
Hello Stig,

Thank you very much for your answer : I have tested our many
topologies using Kafka Client 2.0.1 instead of Kafka Client 2.0.0, let
them run at full charge for couple of days, and I can confirm that
this exception no longer occurs !

May I suggest storm-kafka-client documentation to mention that this
version of Kafka Client is recommended ?

Kind regards,
Alexandre Vermeerbergen

Le lun. 12 nov. 2018 à 23:17, Stig Rohde Døssing
<st...@gmail.com> a écrit :
>
> I don't think beginningOffsets is null. I think it's missing one of the
> partitions, which would mean the right hand side of the line is null, which
> gives an NPE when we try to assign it to a primitive long.
>
> I think this could be due to
> https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit
> message for the fix
> https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0.
> Specifically part 2 sounds a lot like what I think might be happening here.
>
> "
>
> `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
> assumed that endOffsets()/beginningOffsets() which eventually call
> Fetcher.fetchOffsetsByTimes(), would return a map with all the topic
> partitions passed to endOffsets()/beginningOffsets() and that values
> are not null. Because of (1), null values were possible if some of the
> topic partitions were already known (in metadata cache) and some not
> (metadata cache did not have entries for some of the topic
> partitions). However, even with fixing (1),
> endOffsets()/beginningOffsets() may return a map with some topic
> partitions missing, when list offset request returns a non-retriable
> error.
>
> "
>
> Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics)
> is called, the returned map will contain a value for all requested topics.
> Could you try upgrading to Kafka 2.0.1?
>
> If necessary we can also work around this on the Storm side by skipping the
> metrics if the requested partition isn't in the return values for
> beginningOffsets/endOffsets. Feel free to raise an issue for this.
>
> Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen <
> avermeerbergen@gmail.com>:
>
> > Hello,
> >
> > Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
> > (storm-core & storm-kafka-client) taken from same Git, we get the
> > following crash coming from a NullPointerException in
> > KafkaOffsetMetric.getValueAndReset :
> >
> > 2018-11-12 19:31:30.496 o.a.s.util
> > Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!
> >
> > java.lang.RuntimeException: java.lang.NullPointerException
> >
> >           at
> > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
> > [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> >
> >           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
> >
> > Caused by: java.lang.NullPointerException
> >
> >           at
> > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
> > ~[stormjar.jar:?]
> >
> >           at
> > org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at clojure.core$map$fn__4553.invoke(core.clj:2622)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core$seq__4128.invoke(core.clj:137)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core$next__4112.invoke(core.clj:64)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at
> > clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at
> > clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core$reduce.invoke(core.clj:6519)
> > ~[clojure-1.7.0.jar:?]
> >
> >           at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
> >
> >           at
> > org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           at
> > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
> >
> >           ... 7 more
> >
> >
> > In source code, the null pointer exception comes from the following
> > line of KafkaOffsetMetric.java:
> >
> > long earliestTimeOffset = beginningOffsets.get(topicPartition);
> >
> > The NullPointerException causes the crash of the worker process
> > hosting the Spout, which leads to countless Netty error messages until
> > the Spout is restaured on another worker.
> >
> > Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
> > Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
> > getting these crashes consumes a very high volume of data on a Kafka
> > topic having 16 partitions.
> > All this running with ORACLE Java 8 update 192 on CentOS 7.
> >
> > Any idea why beginningOffsets could be null ?
> >
> > Kind regards,
> > Alexandre Vermeerbergen
> >

Re: NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

Posted by Stig Rohde Døssing <st...@gmail.com>.
I don't think beginningOffsets is null. I think it's missing one of the
partitions, which would mean the right hand side of the line is null, which
gives an NPE when we try to assign it to a primitive long.

I think this could be due to
https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit
message for the fix
https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0.
Specifically part 2 sounds a lot like what I think might be happening here.

"

`ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
assumed that endOffsets()/beginningOffsets() which eventually call
Fetcher.fetchOffsetsByTimes(), would return a map with all the topic
partitions passed to endOffsets()/beginningOffsets() and that values
are not null. Because of (1), null values were possible if some of the
topic partitions were already known (in metadata cache) and some not
(metadata cache did not have entries for some of the topic
partitions). However, even with fixing (1),
endOffsets()/beginningOffsets() may return a map with some topic
partitions missing, when list offset request returns a non-retriable
error.

"

Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics)
is called, the returned map will contain a value for all requested topics.
Could you try upgrading to Kafka 2.0.1?

If necessary we can also work around this on the Storm side by skipping the
metrics if the requested partition isn't in the return values for
beginningOffsets/endOffsets. Feel free to raise an issue for this.

Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen <
avermeerbergen@gmail.com>:

> Hello,
>
> Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
> (storm-core & storm-kafka-client) taken from same Git, we get the
> following crash coming from a NullPointerException in
> KafkaOffsetMetric.getValueAndReset :
>
> 2018-11-12 19:31:30.496 o.a.s.util
> Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
> [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
>           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
>
> Caused by: java.lang.NullPointerException
>
>           at
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
> ~[stormjar.jar:?]
>
>           at
> org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at clojure.core$map$fn__4553.invoke(core.clj:2622)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$seq__4128.invoke(core.clj:137)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
>
>           at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$next__4112.invoke(core.clj:64)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> ~[clojure-1.7.0.jar:?]
>
>           at
> clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> ~[clojure-1.7.0.jar:?]
>
>           at
> clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$reduce.invoke(core.clj:6519)
> ~[clojure-1.7.0.jar:?]
>
>           at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]
>
>           at
> org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>           ... 7 more
>
>
> In source code, the null pointer exception comes from the following
> line of KafkaOffsetMetric.java:
>
> long earliestTimeOffset = beginningOffsets.get(topicPartition);
>
> The NullPointerException causes the crash of the worker process
> hosting the Spout, which leads to countless Netty error messages until
> the Spout is restaured on another worker.
>
> Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
> Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
> getting these crashes consumes a very high volume of data on a Kafka
> topic having 16 partitions.
> All this running with ORACLE Java 8 update 192 on CentOS 7.
>
> Any idea why beginningOffsets could be null ?
>
> Kind regards,
> Alexandre Vermeerbergen
>