You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Nick Cuneo (JIRA)" <ji...@apache.org> on 2016/11/18 19:26:58 UTC

[jira] [Commented] (STORM-2207) Kafka Spout NullPointerException during ack

    [ https://issues.apache.org/jira/browse/STORM-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677533#comment-15677533 ] 

Nick Cuneo commented on STORM-2207:
-----------------------------------

I believe this is a race condition between acking a message as soon as the topology comes online versus initialization of the kafka spout to insert the topic partitions in the ack topic map.

> Kafka Spout NullPointerException during ack
> -------------------------------------------
>
>                 Key: STORM-2207
>                 URL: https://issues.apache.org/jira/browse/STORM-2207
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.0.2
>            Reporter: Nick Cuneo
>            Priority: Critical
>
> This occurs on startup of the topology.  There should be some null check safeguards, but i'm not sure what's causing it to occur in the first place...my guess is  the topic partition is not found in the ack map.
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>     at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> The method and line number in question below:
> @Override
>     public void ack(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
>             acked.get(msgId.getTopicPartition()).add(msgId);
>         }
>         emitted.remove(msgId);
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)