You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/14 20:31:43 UTC

[1/3] storm git commit: Remove auto acking/anchoring for bolts in a stateful topology

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 8922f2037 -> 80213bae7


Remove auto acking/anchoring for bolts in a stateful topology

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ff57a54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ff57a54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ff57a54

Branch: refs/heads/1.x-branch
Commit: 2ff57a541f0a38a2b3d537d45e6a2c8955f23b15
Parents: dd15771
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Mar 9 16:45:17 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:23:38 2016 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/starter/StatefulTopology.java |  1 +
 .../topology/CheckpointTupleForwarder.java      | 21 ++++-----
 .../apache/storm/topology/IStatefulBolt.java    |  7 ++-
 .../storm/topology/StatefulBoltExecutor.java    | 46 ++++++++++++++++----
 .../apache/storm/topology/TopologyBuilder.java  |  5 ++-
 .../topology/StatefulBoltExecutorTest.java      |  1 +
 6 files changed, 58 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
index d09ceea..ba513dd 100644
--- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java
@@ -90,6 +90,7 @@ public class StatefulTopology {
             LOG.debug("{} sum = {}", name, sum);
             kvState.put("sum", sum);
             collector.emit(input, new Values(sum));
+            collector.ack(input);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index cbb3215..11d0384 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -51,7 +51,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
     private final Map<TransactionRequest, Integer> transactionRequestCount;
     private int checkPointInputTaskCount;
     private long lastTxid = Long.MIN_VALUE;
-    protected AnchoringOutputCollector collector;
+    private AnchoringOutputCollector collector;
 
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
@@ -60,9 +60,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = new AnchoringOutputCollector(collector);
+        init(context, collector);
         bolt.prepare(stormConf, context, this.collector);
-        checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+    }
+
+    protected void init(TopologyContext context, OutputCollector collector) {
+        this.collector = new AnchoringOutputCollector(collector);
+        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
     }
 
     @Override
@@ -114,7 +118,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
      * @param input the input tuple
      */
     protected void handleTuple(Tuple input) {
-        collector.setContext(input);
         bolt.execute(input);
     }
 
@@ -224,24 +227,18 @@ public class CheckpointTupleForwarder implements IRichBolt {
 
 
     protected static class AnchoringOutputCollector extends OutputCollector {
-        private Tuple inputTuple;
-
         AnchoringOutputCollector(IOutputCollector delegate) {
             super(delegate);
         }
 
-        void setContext(Tuple inputTuple) {
-            this.inputTuple = inputTuple;
-        }
-
         @Override
         public List<Integer> emit(String streamId, List<Object> tuple) {
-            return emit(streamId, inputTuple, tuple);
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
         }
 
         @Override
         public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-            emitDirect(taskId, streamId, inputTuple, tuple);
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
         }
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index 1c2c5fc..ed55e1d 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -20,7 +20,12 @@ package org.apache.storm.topology;
 import org.apache.storm.state.State;
 
 /**
- * A bolt abstraction for supporting stateful computation.
+ * A bolt abstraction for supporting stateful computation. The state of the bolt is
+ * periodically checkpointed.
+ *
+ * <p>The framework provides at-least once guarantee for the
+ * state updates. The stateful bolts are expected to anchor the tuples while emitting
+ * and ack the input tuples once its processed.</p>
  */
 public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index c9c36ee..237305e 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,8 +28,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.apache.storm.spout.CheckPointState.Action;
 import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
@@ -47,7 +51,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     private boolean boltInitialized = false;
     private List<Tuple> pendingTuples = new ArrayList<>();
     private List<Tuple> preparedTuples = new ArrayList<>();
-    private List<Tuple> executedTuples = new ArrayList<>();
+    private AckTrackingOutputCollector collector;
 
     public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
         super(bolt);
@@ -63,7 +67,9 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
 
     // package access for unit tests
     void prepare(Map stormConf, TopologyContext context, OutputCollector collector, State state) {
-        super.prepare(stormConf, context, collector);
+        init(context, collector);
+        this.collector = new AckTrackingOutputCollector(collector);
+        bolt.prepare(stormConf, context, this.collector);
         this.state = state;
     }
 
@@ -74,8 +80,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             if (boltInitialized) {
                 bolt.prePrepare(txid);
                 state.prepareCommit(txid);
-                preparedTuples.addAll(executedTuples);
-                executedTuples.clear();
+                preparedTuples.addAll(collector.ackedTuples());
             } else {
                 /*
                  * May be the task restarted in the middle and the state needs be initialized.
@@ -93,7 +98,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             bolt.preRollback();
             state.rollback();
             fail(preparedTuples);
-            fail(executedTuples);
+            fail(collector.ackedTuples());
         } else if (action == INITSTATE) {
             if (!boltInitialized) {
                 bolt.initState((T) state);
@@ -109,7 +114,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
             }
         }
         collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action));
-        collector.ack(checkpointTuple);
+        collector.delegate.ack(checkpointTuple);
     }
 
     @Override
@@ -123,16 +128,14 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     }
 
     private void doExecute(Tuple tuple) {
-        collector.setContext(tuple);
         bolt.execute(tuple);
-        executedTuples.add(tuple);
     }
 
     private void ack(List<Tuple> tuples) {
         if (!tuples.isEmpty()) {
             LOG.debug("Acking {} tuples", tuples.size());
             for (Tuple tuple : tuples) {
-                collector.ack(tuple);
+                collector.delegate.ack(tuple);
             }
             tuples.clear();
         }
@@ -148,4 +151,29 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
         }
     }
 
+    private static class AckTrackingOutputCollector extends AnchoringOutputCollector {
+        private final OutputCollector delegate;
+        private final Queue<Tuple> ackedTuples;
+
+        AckTrackingOutputCollector(OutputCollector delegate) {
+            super(delegate);
+            this.delegate = delegate;
+            this.ackedTuples = new ConcurrentLinkedQueue<>();
+        }
+
+        List<Tuple> ackedTuples() {
+            List<Tuple> result = new ArrayList<>();
+            Iterator<Tuple> it = ackedTuples.iterator();
+            while(it.hasNext()) {
+                result.add(it.next());
+                it.remove();
+            }
+            return result;
+        }
+
+        @Override
+        public void ack(Tuple input) {
+            ackedTuples.add(input);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index af41553..5b7d499 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -234,7 +234,10 @@ public class TopologyBuilder {
      * state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
      * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
      * with its previously saved state.
-     *
+     * <p>
+     * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+     * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+     * </p>
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the stateful bolt
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.

http://git-wip-us.apache.org/repos/asf/storm/blob/2ff57a54/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
index 69c541b..6606491 100644
--- a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
@@ -170,6 +170,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
         executor.execute(mockCheckpointTuple);
+        mockOutputCollector.ack(mockTuple);
         Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
         Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
         Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);


[2/3] storm git commit: Fix stateful topology acking behavior

Posted by pt...@apache.org.
Fix stateful topology acking behavior

Right now the acking is automatically taken care of for the non-stateful bolts in a stateful topology.
This leads to double acking if BaseRichBolts are part of the topology.
For the non-stateful bolts, its better to let the bolt do the acking rather than automatically acking.

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd157719
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd157719
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd157719

Branch: refs/heads/1.x-branch
Commit: dd1577193d991489a7c46c50dcfa86db9fac5af2
Parents: 8922f20
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Mar 7 11:30:35 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:23:38 2016 -0400

----------------------------------------------------------------------
 .../storm/starter/spout/RandomIntegerSpout.java      | 15 ++++++++++++++-
 .../storm/topology/CheckpointTupleForwarder.java     |  1 -
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dd157719/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
index f6a35bf..e031f6e 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomIntegerSpout.java
@@ -24,6 +24,8 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Random;
@@ -33,6 +35,7 @@ import java.util.Random;
  * every 100 ms. The ts field can be used in tuple time based windowing.
  */
 public class RandomIntegerSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(RandomIntegerSpout.class);
     private SpoutOutputCollector collector;
     private Random rand;
     private long msgId = 0;
@@ -51,6 +54,16 @@ public class RandomIntegerSpout extends BaseRichSpout {
     @Override
     public void nextTuple() {
         Utils.sleep(100);
-        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId));
+        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId);
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        LOG.debug("Got ACK for msgId : " + msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        LOG.debug("Got FAIL for msgId : " + msgId);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dd157719/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 675be57..cbb3215 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -116,7 +116,6 @@ public class CheckpointTupleForwarder implements IRichBolt {
     protected void handleTuple(Tuple input) {
         collector.setContext(input);
         bolt.execute(input);
-        collector.ack(input);
     }
 
     /**


[3/3] storm git commit: add STORM-1608 to changelog

Posted by pt...@apache.org.
add STORM-1608 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/80213bae
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/80213bae
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/80213bae

Branch: refs/heads/1.x-branch
Commit: 80213bae74d2ca3c9079f26e3070d71b1256c70a
Parents: 2ff57a5
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Mar 14 15:24:34 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Mar 14 15:24:34 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/80213bae/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fa32a0a..48254bd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.0
+ * STORM-1608: Fix stateful topology acking behavior
  * STORM-1609: Netty Client is not best effort delivery on failed Connection
  * STORM-1620: Update curator to fix CURATOR-209
  * STORM-1469: Decommission SimpleTransportPlugin and configuration