You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/14 20:31:29 UTC

[2/4] storm git commit: Remove auto acking/anchoring for bolts in a stateful topology

Remove auto acking/anchoring for bolts in a stateful topology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b2a98f1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b2a98f1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b2a98f1

Branch: refs/heads/master
Commit: 2b2a98f14383f7d50b359964fd89e6ea31a9a673
Parents: c0bce3e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Mar 9 16:45:17 2016 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Wed Mar 9 22:52:09 2016 +0530

----------------------------------------------------------------------
 .../src/jvm/storm/starter/StatefulTopology.java |  1 +
 .../topology/CheckpointTupleForwarder.java      | 21 ++++-----
 .../apache/storm/topology/IStatefulBolt.java    |  7 ++-
 .../storm/topology/StatefulBoltExecutor.java    | 46 ++++++++++++++++----
 .../apache/storm/topology/TopologyBuilder.java  |  5 ++-
 .../topology/StatefulBoltExecutorTest.java      |  1 +
 6 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index d09ceea..ba513dd 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -90,6 +90,7 @@ public class StatefulTopology {
             LOG.debug("{} sum = {}", name, sum);
             kvState.put("sum", sum);
             collector.emit(input, new Values(sum));
+            collector.ack(input);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index cbb3215..11d0384 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -51,7 +51,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
     private final Map<TransactionRequest, Integer> transactionRequestCount;
     private int checkPointInputTaskCount;
     private long lastTxid = Long.MIN_VALUE;
-    protected AnchoringOutputCollector collector;
+    private AnchoringOutputCollector collector;
 
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
@@ -60,9 +60,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = new AnchoringOutputCollector(collector);
+        init(context, collector);
         bolt.prepare(stormConf, context, this.collector);
-        checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+    }
+
+    protected void init(TopologyContext context, OutputCollector collector) {
+        this.collector = new AnchoringOutputCollector(collector);
+        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
     }
 
     @Override
@@ -114,7 +118,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
      * @param input the input tuple
      */
     protected void handleTuple(Tuple input) {
-        collector.setContext(input);
         bolt.execute(input);
     }
 
@@ -224,24 +227,18 @@ public class CheckpointTupleForwarder implements IRichBolt {
 
 
     protected static class AnchoringOutputCollector extends OutputCollector {
-        private Tuple inputTuple;
-
         AnchoringOutputCollector(IOutputCollector delegate) {
             super(delegate);
         }
 
-        void setContext(Tuple inputTuple) {
-            this.inputTuple = inputTuple;
-        }
-
         @Override
         public List<Integer> emit(String streamId, List<Object> tuple) {
-            return emit(streamId, inputTuple, tuple);
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
         }
 
         @Override
         public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-            emitDirect(taskId, streamId, inputTuple, tuple);
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
         }
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index 1c2c5fc..ed55e1d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -20,7 +20,12 @@ package org.apache.storm.topology;
 import org.apache.storm.state.State;
 
 /**
- * A bolt abstraction for supporting stateful computation.
+ * A bolt abstraction for supporting stateful computation. The state of the bolt is
+ * periodically checkpointed.
+ *
+ * <p>The framework provides at-least once guarantee for the
+ * state updates. The stateful bolts are expected to anchor the tuples while emitting
+ * and ack the input tuples once its processed.</p>
  */
 public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index c9c36ee..237305e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,8 +28,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.storm.spout.CheckPointState.Action;
 import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
@@ -47,7 +51,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     private boolean boltInitialized = false;
     private List<Tuple> pendingTuples = new ArrayList<>();
     private List<Tuple> preparedTuples = new ArrayList<>();
-    private List<Tuple> executedTuples = new ArrayList<>();
+    private AckTrackingOutputCollector collector;
 
     public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
         super(bolt);
@@ -63,7 +67,9 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
 
     // package access for unit tests
     void prepare(Map stormConf, TopologyContext context, OutputCollector collector, State state) {
-        super.prepare(stormConf, context, collector);
+        init(context, collector);
+        this.collector = new AckTrackingOutputCollector(collector);
+        bolt.prepare(stormConf, context, this.collector);
         this.state = state;
     }
 
@@ -74,8 +80,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             if (boltInitialized) {
                 bolt.prePrepare(txid);
                 state.prepareCommit(txid);
-                preparedTuples.addAll(executedTuples);
-                executedTuples.clear();
+                preparedTuples.addAll(collector.ackedTuples());
             } else {
                 /*
                  * May be the task restarted in the middle and the state needs be initialized.
@@ -93,7 +98,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             bolt.preRollback();
             state.rollback();
             fail(preparedTuples);
-            fail(executedTuples);
+            fail(collector.ackedTuples());
         } else if (action == INITSTATE) {
             if (!boltInitialized) {
                 bolt.initState((T) state);
@@ -109,7 +114,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             }
         }
         collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
-        collector.ack(checkpointTuple);
+        collector.delegate.ack(checkpointTuple);
     }
 
     @Override
@@ -123,16 +128,14 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     }
 
     private void doExecute(Tuple tuple) {
-        collector.setContext(tuple);
         bolt.execute(tuple);
-        executedTuples.add(tuple);
     }
 
     private void ack(List<Tuple> tuples) {
         if (!tuples.isEmpty()) {
             LOG.debug("Acking {} tuples", tuples.size());
             for (Tuple tuple : tuples) {
-                collector.ack(tuple);
+                collector.delegate.ack(tuple);
             }
             tuples.clear();
         }
@@ -148,4 +151,29 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
         }
     }
 
+    private static class AckTrackingOutputCollector extends AnchoringOutputCollector {
+        private final OutputCollector delegate;
+        private final Queue<Tuple> ackedTuples;
+
+        AckTrackingOutputCollector(OutputCollector delegate) {
+            super(delegate);
+            this.delegate = delegate;
+            this.ackedTuples = new ConcurrentLinkedQueue<>();
+        }
+
+        List<Tuple> ackedTuples() {
+            List<Tuple> result = new ArrayList<>();
+            Iterator<Tuple> it = ackedTuples.iterator();
+            while(it.hasNext()) {
+                result.add(it.next());
+                it.remove();
+            }
+            return result;
+        }
+
+        @Override
+        public void ack(Tuple input) {
+            ackedTuples.add(input);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index af41553..5b7d499 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -234,7 +234,10 @@ public class TopologyBuilder {
      * state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
      * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
      * with its previously saved state.
-     *
+     * <p>
+     * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+     * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+     * </p>
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the stateful bolt
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/2b2a98f1/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
index 69c541b..6606491 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
@@ -170,6 +170,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
         executor.execute(mockCheckpointTuple);
+        mockOutputCollector.ack(mockTuple);
         Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
         Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
         Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);