You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "iryoung jeong (JIRA)" <ji...@apache.org> on 2016/12/29 03:39:58 UTC

[jira] [Commented] (STORM-2207) Kafka Spout NullPointerException during ack

    [ https://issues.apache.org/jira/browse/STORM-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15784408#comment-15784408 ] 

iryoung jeong commented on STORM-2207:
--------------------------------------

I also experienced the same error! And my topology in production failed to start because of many errors like this.
So I tested using small sets such as 2 worker or 5 spout, and it failed as well.
(FYI - The topic has almost hundred parittions)

> Kafka Spout NullPointerException during ack
> -------------------------------------------
>
>                 Key: STORM-2207
>                 URL: https://issues.apache.org/jira/browse/STORM-2207
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.0.2
>            Reporter: Nick Cuneo
>            Priority: Critical
>
> This occurs on startup of the topology.  There should be some null check safeguards, but i'm not sure what's causing it to occur in the first place...my guess is  the topic partition is not found in the ack map.
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
>     at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>     at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> The method and line number in question below:
> @Override
>     public void ack(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
>             acked.get(msgId.getTopicPartition()).add(msgId);
>         }
>         emitted.remove(msgId);
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)