You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "James Xu (JIRA)" <ji...@apache.org> on 2013/12/15 06:58:06 UTC

[jira] [Created] (STORM-136) [Trident] Bug when working with two spouts with stateful operator

James Xu created STORM-136:
------------------------------

             Summary: [Trident] Bug when working with two spouts with stateful operator
                 Key: STORM-136
                 URL: https://issues.apache.org/jira/browse/STORM-136
             Project: Apache Storm (Incubating)
          Issue Type: Bug
            Reporter: James Xu
            Priority: Minor


https://github.com/nathanmarz/storm/issues/711

Hi,

I'm working with Trident and basically what i am trying to do is a windowed join across batches using partitionPersist and stateQuery on two different stream that come from TWO DIFFERENT SPOUTS.

Both the spouts implement the IBatchSpout interface.

The error i get is a NPE on StateQueryProcessor or on PartitionPersistProcessor depending on which one of the two spouts starts early.

I try to debug this and what i have understand is that Trident use the same BatchID(txid) for the two different spouts and this take to a wrong initialization of state in the core processing nodes.

If i use the same throughput for the two spout and i make one spout starts with a delay the problem doesn't occur (we don't have an overlap between the BachID(txid) of the different spouts).

----------
liesrock: Sorry for the delay.
I send to you the source code.

https://dl.dropboxusercontent.com/u/49470846/TridentBug.tar.gz

In this example i am using two spout:
1) the first one starts emitting tuples at a certain rate.
2) the second one is delayed by 5 seconds and emits at an higher rate than the first one.

I print on the sterr some debug info, the most important is the Batch ID (txid).
I print on the stdout info about the state window.

You can see that this is exactly the case that i was explain in my first message.

Thank you for your attention, i can't figure out on this, so i left Trident and i start working with Storm(i didn't have problem in implementing this on Storm).

Let me know what is your opinion about it.

Luca Muratore

----------
xumingming: @liesrock try to minimize your test case so it can reproduce the issue but contains the least source files(preferably only one source file), the least lines of source code --- that makes us more easier to see whether the issue is in your app code or in storm core.

----------
liesrock: As you requested, in the following link you can find the test case in one source file and with the minimum lines of code:

https://dl.dropboxusercontent.com/u/49470846/SimpleTridentBug.zip

Thanks for your patience.

----------
xumingming: paste the case source code here to make it easier for others to follow:

package storm.starter.trident;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import storm.trident.state.BaseQueryFunction;
import storm.trident.state.BaseStateUpdater;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.task.IMetricsContext;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;


public class BugTest {

    //Spout that simulates an ATM
    @SuppressWarnings("serial")
    public static class ATMSpout implements IBatchSpout {   
        private int batchSize;
        private int initialSleepMilliTime;
        private int rate;
        private String name;
        private List<String> currentCCIDList;
        private long withdrawalID = 0;
        private final static String[] LOCATIONS = { "Madrid", "Barcelona", "Siviglia", "Granada", "Toledo", "Ibiza", "Valladolid", "Valencia" };


        public ATMSpout(int batchSize, int initialSleepMilliTime, int rate, String name) throws IOException {
            this.batchSize = batchSize;
            this.initialSleepMilliTime = initialSleepMilliTime;
            this.rate = rate;
            this.name = name;
        }

        //generate a list of "size" CCID
        private List<String> generateCCID(int size){
            //check the input parameter
            if(size < 0){
                System.err.println("Negative CCID list size");
                return null;
            }
            //initialize some variables
            List<String> aux = new ArrayList<String>();
            Integer randDigit = 0;
            String currentCCID = "";
            //create a random CCID
            for(int i = 0; i < size; i++){
                for(int j = 0; j < 16; j++){
                    randDigit = (int)(Math.random() * 10);
                    currentCCID +=  randDigit;
                }
                aux.add(currentCCID);
                currentCCID = "";
            }
            return aux;
        }


        @SuppressWarnings("rawtypes")
        @Override
        public void open(Map conf, TopologyContext context) {
            System.err.println("Open Spout instance");
            this.currentCCIDList = generateCCID(10000);
            //initial delay
            try {
                Thread.sleep(initialSleepMilliTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        @Override
        public void emitBatch(long batchId, TridentCollector collector) {
            System.err.println(name + " ---> Starting emitting Batch number : " + batchId);
            for(int i = 0; i < batchSize; i++) {
                try {
                    Thread.sleep(rate);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                collector.emit(getNextWithdrawal());
            }
        }

        @Override
        public void ack(long batchId) {
        }

        @Override
        public void close() {
        }

        @SuppressWarnings("rawtypes")
        @Override
        public Map getComponentConfiguration() {
            return new Config();
        }

        @Override
        public Fields getOutputFields() {
            return new Fields("withdrawalID", "ccID", "location", "amount", "timestamp");
        }

        private Values getNextWithdrawal(){
            int randIndexCCID = (int) ((Math.random()) * this.currentCCIDList.size());
            int randAmmount = (int) ((Math.random()) * 1000);
            int randIndexLocation = (int) ((Math.random()) * LOCATIONS.length);
            return new Values(++withdrawalID, 
                              this.currentCCIDList.get(randIndexCCID), 
                              LOCATIONS[randIndexLocation],
                              randAmmount,
                              System.currentTimeMillis());
        }

    }


    //state updater
    @SuppressWarnings("serial")
    private static class Updater extends BaseStateUpdater<DB>  {
        private String streamName;

        public Updater(String streamName) {
            this.streamName = streamName;
        }

        @Override
        public void updateState(DB state, List<TridentTuple> tuples, TridentCollector collector) {
            System.err.println(streamName + " Update State");
            }
    }

    //query
    @SuppressWarnings("serial")
    public static class Query extends BaseQueryFunction<DB, TridentTuple> {
        private String streamName;

        public Query(String streamName) {
            this.streamName = streamName;
        }

        @Override
        public List<TridentTuple> batchRetrieve(DB state, List<TridentTuple> inputs) {
            System.err.println(streamName + " Query");

            List<TridentTuple> retList = new ArrayList<TridentTuple>(); //return list
            for(int i = 0; i < 5; i++){
                retList.add(null);
            }
            return retList;
        }

        @Override
        public void execute(TridentTuple tuple, TridentTuple result, TridentCollector collector) {

        }

    }

    //state
    public static class DB implements State {

        @Override
        public void beginCommit(Long txid) {

        }

        @Override
        public void commit(Long txid) {

        }

    }

    //factory
    @SuppressWarnings("serial")
    public static class Factory implements StateFactory {

        @SuppressWarnings("rawtypes")
        @Override
        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            return new DB();
        }       
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();

        final int BATCH_SIZE = 5;
        final int DELAY_1 = 100;
        final int RATE_1 = 300;
        final int DELAY_2 = 5000;
        final int RATE_2 = 100;

        ATMSpout spout1 = new ATMSpout(BATCH_SIZE, DELAY_1, RATE_1, "SpoutLowRate");
        ATMSpout spout2 = new ATMSpout(BATCH_SIZE, DELAY_2, RATE_2, "SpoutHighRate");   
        TridentTopology topology = new TridentTopology();

        Stream s1 = topology.newStream("stream1", spout1);
        Stream s2 = topology.newStream("stream2", spout2);

        TridentState leftState = s1.partitionPersist(new Factory(), 
                                                     s1.getOutputFields(),
                                                     new Updater("left"));
        s2.stateQuery(leftState,
                      s1.getOutputFields(),
                      new Query("right"),
                      new Fields("out"));

        cluster.submitTopology("BugTest", conf, topology.build());

        Thread.sleep(20000);

        cluster.shutdown();
    }
}

----------
xumingming: stacktrace:

5320 [Thread-27-b-0] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc3.jar:na]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
    at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
Caused by: java.lang.NullPointerException: null
    at storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc3.jar:na]
    ... 6 common frames omitted
5320 [Thread-27-b-0] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.NullPointerException
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) ~[storm-core-0.9.0-rc3.jar:na]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
    at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
Caused by: java.lang.NullPointerException: null
    at storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) ~[storm-core-0.9.0-rc3.jar:na]
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43) ~[storm-core-0.9.0-rc3.jar:na]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0-rc3.jar:na]
    ... 6 common frames omitted
5334 [Thread-27-b-0] INFO  backtype.storm.util - Halting process: ("Worker died")

The NullPointerException is thrown because state is null:

https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L57-L61

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
        BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
        state.tuples.add(tuple);
        state.args.add(_projection.create(tuple));
    }
Why state is null? state is initialized in startBatch method:
https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L52-L54

    public void startBatch(ProcessorContext processorContext) {
        processorContext.state[_context.getStateIndex()] =  new BatchState();
    }
In this case, startBatch is not called before execute is called.

Why startBatch is not called? startBatch is called by initBatchState, initBatchState should be called by TridentBoltExecutor.execute:

        if(tracked==null) {
            tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());
            _batches.put(id.getId(), tracked);
        }
In the code above, initBatchState is called with a batchGroup arg, when there are multiple batchGroups in one batch, the TrackedBatch will only be created once, then the initBatchState will only be called for the first batchGroup, then startBatch method of processors in the rest batchGroups will not be called, then the NullPointerException, @nathanmarz, is this a bug or just not using Trident in the correct way?

----------
nathanmarz: Looks like a bug.



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)