You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2018/04/27 19:21:00 UTC

[jira] [Commented] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

    [ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456942#comment-16456942 ] 

Stig Rohde Døssing commented on STORM-3046:
-------------------------------------------

This is caused by a bad assumption I made here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L161.

I assumed that the last batch metadata would only be null for the first batch, or reemits of the first batch. I forgot to account for cases where the first batch contains no tuples, because in that case we'll set the metadata for the first batch to null (see https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L125). In the following call to emitBatch, we're going to hit this line with a null lastBatchMeta https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L181. This is very likely going to happen if you use e.g. LATEST as the FirstPollOffsetStrategy, and as you observe it doesn't happen if you start at the beginning of a partition and there are messages to emit.

Let me know if you'd like to try to fix it. If not I'll be happy to give it a shot, and would appreciate if you would try out the potential fix.



> Getting a NPE leading worker to die when starting a topology.
> -------------------------------------------------------------
>
>                 Key: STORM-3046
>                 URL: https://issues.apache.org/jira/browse/STORM-3046
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: rm-kafka-client, trident
>    Affects Versions: 1.2.1
>            Reporter: Kush Khandelwal
>            Priority: Blocker
>              Labels: kafka, storm-kafka-client, trident
>         Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker getting died.
> If I set poll strategy to earliest and the topic already contains some messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test")
>  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>  .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
>  .setRecordTranslator(new RecordTranslator(), stageArguments)
>  .build();
> KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig);
> Grouping partitionGroup = getPartitionGroup("test");
> log.info("Creating Opaque-Trident-Kafka-Spout");
> final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
>  
> TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new 
> MainMemoryStateUpdater(), stageArguments).parallelismHint(1);
>  
> Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2);
> return topology.build();
>  }
> public static void main(String[] args) {
>  Config conf = new Config();
>  log.info("Topology config: " + conf);
>  Properties properties = getProperties("/storm-cluster.properties");
> conf.setMessageTimeoutSecs(600);
> log.info("Building Topology");
>  StormTopology topology = buildTopology(properties);
>  log.info(topology.toString());
> log.info("Submitting handle-rule Topology");
>  try {
>  LocalCluster cluster = new LocalCluster();
>  cluster.submitTopology("handle-rule", conf, topology);
>  } catch (Exception e) {
>  e.printStackTrace();
>  }
>  }
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)