You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2017/08/24 05:45:01 UTC

[jira] [Resolved] (STORM-2207) Kafka Spout NullPointerException during ack

     [ https://issues.apache.org/jira/browse/STORM-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim resolved STORM-2207.
---------------------------------
    Resolution: Duplicate

> Kafka Spout NullPointerException during ack
> -------------------------------------------
>
>                 Key: STORM-2207
>                 URL: https://issues.apache.org/jira/browse/STORM-2207
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Nick Cuneo
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>
> This occurs on startup of the topology.  There should be some null check safeguards, but i'm not sure what's causing it to occur in the first place...my guess is  the topic partition is not found in the ack map.
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>     at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> The method and line number in question below:
> @Override
>     public void ack(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
>             acked.get(msgId.getTopicPartition()).add(msgId);
>         }
>         emitted.remove(msgId);
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)