You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Han Jia (JIRA)" <ji...@apache.org> on 2014/06/13 09:11:01 UTC

[jira] [Commented] (STORM-136) [Trident] Bug when working with two spouts with stateful operator

    [ https://issues.apache.org/jira/browse/STORM-136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14030337#comment-14030337 ] 

Han Jia commented on STORM-136:
-------------------------------

Hi guys,

Is there any updates on this issue? I have a similar use case and get the same exception.

Thanks,
Han

> [Trident] Bug when working with two spouts with stateful operator
> -----------------------------------------------------------------
>
>                 Key: STORM-136
>                 URL: https://issues.apache.org/jira/browse/STORM-136
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/711
> Hi,
> I'm working with Trident and basically what i am trying to do is a windowed join across batches using partitionPersist and stateQuery on two different stream that come from TWO DIFFERENT SPOUTS.
> Both the spouts implement the IBatchSpout interface.
> The error i get is a NPE on StateQueryProcessor or on PartitionPersistProcessor depending on which one of the two spouts starts early.
> I try to debug this and what i have understand is that Trident use the same BatchID(txid) for the two different spouts and this take to a wrong initialization of state in the core processing nodes.
> If i use the same throughput for the two spout and i make one spout starts with a delay the problem doesn't occur (we don't have an overlap between the BachID(txid) of the different spouts).
> ----------
> liesrock: Sorry for the delay.
> I send to you the source code.
> https://dl.dropboxusercontent.com/u/49470846/TridentBug.tar.gz
> In this example i am using two spout:
> 1) the first one starts emitting tuples at a certain rate.
> 2) the second one is delayed by 5 seconds and emits at an higher rate than the first one.
> I print on the sterr some debug info, the most important is the Batch ID (txid).
> I print on the stdout info about the state window.
> You can see that this is exactly the case that i was explain in my first message.
> Thank you for your attention, i can't figure out on this, so i left Trident and i start working with Storm(i didn't have problem in implementing this on Storm).
> Let me know what is your opinion about it.
> Luca Muratore
> ----------
> xumingming: @liesrock try to minimize your test case so it can reproduce the issue but contains the least source files(preferably only one source file), the least lines of source code --- that makes us more easier to see whether the issue is in your app code or in storm core.
> ----------
> liesrock: As you requested, in the following link you can find the test case in one source file and with the minimum lines of code:
> https://dl.dropboxusercontent.com/u/49470846/SimpleTridentBug.zip
> Thanks for your patience.
> ----------
> xumingming: paste the case source code here to make it easier for others to follow:
> package storm.starter.trident;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import storm.trident.Stream;
> import storm.trident.TridentState;
> import storm.trident.TridentTopology;
> import storm.trident.operation.TridentCollector;
> import storm.trident.spout.IBatchSpout;
> import storm.trident.state.BaseQueryFunction;
> import storm.trident.state.BaseStateUpdater;
> import storm.trident.state.State;
> import storm.trident.state.StateFactory;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.task.IMetricsContext;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public class BugTest {
>     //Spout that simulates an ATM
>     @SuppressWarnings("serial")
>     public static class ATMSpout implements IBatchSpout {   
>         private int batchSize;
>         private int initialSleepMilliTime;
>         private int rate;
>         private String name;
>         private List<String> currentCCIDList;
>         private long withdrawalID = 0;
>         private final static String[] LOCATIONS = { "Madrid", "Barcelona", "Siviglia", "Granada", "Toledo", "Ibiza", "Valladolid", "Valencia" };
>         public ATMSpout(int batchSize, int initialSleepMilliTime, int rate, String name) throws IOException {
>             this.batchSize = batchSize;
>             this.initialSleepMilliTime = initialSleepMilliTime;
>             this.rate = rate;
>             this.name = name;
>         }
>         //generate a list of "size" CCID
>         private List<String> generateCCID(int size){
>             //check the input parameter
>             if(size < 0){
>                 System.err.println("Negative CCID list size");
>                 return null;
>             }
>             //initialize some variables
>             List<String> aux = new ArrayList<String>();
>             Integer randDigit = 0;
>             String currentCCID = "";
>             //create a random CCID
>             for(int i = 0; i < size; i++){
>                 for(int j = 0; j < 16; j++){
>                     randDigit = (int)(Math.random() * 10);
>                     currentCCID +=  randDigit;
>                 }
>                 aux.add(currentCCID);
>                 currentCCID = "";
>             }
>             return aux;
>         }
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void open(Map conf, TopologyContext context) {
>             System.err.println("Open Spout instance");
>             this.currentCCIDList = generateCCID(10000);
>             //initial delay
>             try {
>                 Thread.sleep(initialSleepMilliTime);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>         @Override
>         public void emitBatch(long batchId, TridentCollector collector) {
>             System.err.println(name + " ---> Starting emitting Batch number : " + batchId);
>             for(int i = 0; i < batchSize; i++) {
>                 try {
>                     Thread.sleep(rate);
>                 } catch (InterruptedException e) {
>                     e.printStackTrace();
>                 }
>                 collector.emit(getNextWithdrawal());
>             }
>         }
>         @Override
>         public void ack(long batchId) {
>         }
>         @Override
>         public void close() {
>         }
>         @SuppressWarnings("rawtypes")
>         @Override
>         public Map getComponentConfiguration() {
>             return new Config();
>         }
>         @Override
>         public Fields getOutputFields() {
>             return new Fields("withdrawalID", "ccID", "location", "amount", "timestamp");
>         }
>         private Values getNextWithdrawal(){
>             int randIndexCCID = (int) ((Math.random()) * this.currentCCIDList.size());
>             int randAmmount = (int) ((Math.random()) * 1000);
>             int randIndexLocation = (int) ((Math.random()) * LOCATIONS.length);
>             return new Values(++withdrawalID, 
>                               this.currentCCIDList.get(randIndexCCID), 
>                               LOCATIONS[randIndexLocation],
>                               randAmmount,
>                               System.currentTimeMillis());
>         }
>     }
>     //state updater
>     @SuppressWarnings("serial")
>     private static class Updater extends BaseStateUpdater<DB>  {
>         private String streamName;
>         public Updater(String streamName) {
>             this.streamName = streamName;
>         }
>         @Override
>         public void updateState(DB state, List<TridentTuple> tuples, TridentCollector collector) {
>             System.err.println(streamName + " Update State");
>             }
>     }
>     //query
>     @SuppressWarnings("serial")
>     public static class Query extends BaseQueryFunction<DB, TridentTuple> {
>         private String streamName;
>         public Query(String streamName) {
>             this.streamName = streamName;
>         }
>         @Override
>         public List<TridentTuple> batchRetrieve(DB state, List<TridentTuple> inputs) {
>             System.err.println(streamName + " Query");
>             List<TridentTuple> retList = new ArrayList<TridentTuple>(); //return list
>             for(int i = 0; i < 5; i++){
>                 retList.add(null);
>             }
>             return retList;
>         }
>         @Override
>         public void execute(TridentTuple tuple, TridentTuple result, TridentCollector collector) {
>         }
>     }
>     //state
>     public static class DB implements State {
>         @Override
>         public void beginCommit(Long txid) {
>         }
>         @Override
>         public void commit(Long txid) {
>         }
>     }
>     //factory
>     @SuppressWarnings("serial")
>     public static class Factory implements StateFactory {
>         @SuppressWarnings("rawtypes")
>         @Override
>         public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
>             return new DB();
>         }       
>     }
>     public static void main(String[] args) throws Exception {
>         Config conf = new Config();
>         LocalCluster cluster = new LocalCluster();
>         final int BATCH_SIZE = 5;
>         final int DELAY_1 = 100;
>         final int RATE_1 = 300;
>         final int DELAY_2 = 5000;
>         final int RATE_2 = 100;
>         ATMSpout spout1 = new ATMSpout(BATCH_SIZE, DELAY_1, RATE_1, "SpoutLowRate");
>         ATMSpout spout2 = new ATMSpout(BATCH_SIZE, DELAY_2, RATE_2, "SpoutHighRate");   
>         TridentTopology topology = new TridentTopology();
>         Stream s1 = topology.newStream("stream1", spout1);
>         Stream s2 = topology.newStream("stream2", spout2);
>         TridentState leftState = s1.partitionPersist(new Factory(), 
>                                                      s1.getOutputFields(),
>                                                      new Updater("left"));
>         s2.stateQuery(leftState,
>                       s1.getOutputFields(),
>                       new Query("right"),
>                       new Fields("out"));
>         cluster.submitTopology("BugTest", conf, topology.build());
>         Thread.sleep(20000);
>         cluster.shutdown();
>     }
> }
> ----------
> xumingming: stacktrace:
> 5320 [Thread-27-b-0] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc3.jar:na]
>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>     at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
>     at storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc3.jar:na]
>     ... 6 common frames omitted
> 5320 [Thread-27-b-0] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.NullPointerException
>     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc3.jar:na]
>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>     at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
>     at storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) ~[storm-core-0.9.0-rc3.jar:na]
>     at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc3.jar:na]
>     ... 6 common frames omitted
> 5334 [Thread-27-b-0] INFO  backtype.storm.util - Halting process: ("Worker died")
> The NullPointerException is thrown because state is null:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L57-L61
>     @Override
>     public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
>         BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
>         state.tuples.add(tuple);
>         state.args.add(_projection.create(tuple));
>     }
> Why state is null? state is initialized in startBatch method:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L52-L54
>     public void startBatch(ProcessorContext processorContext) {
>         processorContext.state[_context.getStateIndex()] =  new BatchState();
>     }
> In this case, startBatch is not called before execute is called.
> Why startBatch is not called? startBatch is called by initBatchState, initBatchState should be called by TridentBoltExecutor.execute:
>         if(tracked==null) {
>             tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
>             _batches.put(id.getId(), tracked);
>         }
> In the code above, initBatchState is called with a batchGroup arg, when there are multiple batchGroups in one batch, the TrackedBatch will only be created once, then the initBatchState will only be called for the first batchGroup, then startBatch method of processors in the rest batchGroups will not be called, then the NullPointerException, @nathanmarz, is this a bug or just not using Trident in the correct way?
> ----------
> nathanmarz: Looks like a bug.



--
This message was sent by Atlassian JIRA
(v6.2#6252)