You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/04/20 10:23:44 UTC

[1/4] storm git commit: [STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism

Repository: storm
Updated Branches:
  refs/heads/1.x-branch a7370a62f -> 3399a4a46


[STORM-1714] StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b879d62
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b879d62
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b879d62

Branch: refs/heads/1.x-branch
Commit: 3b879d62d68ac75310d12784d8d1ad4213fdd38e
Parents: a7370a6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Fri Apr 15 17:36:07 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:18:12 2016 +0900

----------------------------------------------------------------------
 .../topology/CheckpointTupleForwarder.java      |  8 ++++
 .../apache/storm/topology/IStatefulBolt.java    | 19 +++++++-
 .../storm/topology/StatefulBoltExecutor.java    | 20 ++++++--
 .../apache/storm/topology/TopologyBuilder.java  | 48 ++++++++++++++++++++
 4 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 11d0384..9d21c33 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -53,6 +53,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
     private long lastTxid = Long.MIN_VALUE;
     private AnchoringOutputCollector collector;
 
+    public CheckpointTupleForwarder() {
+        this(null);
+    }
+
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
         transactionRequestCount = new HashMap<>();
@@ -86,6 +90,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         bolt.declareOutputFields(declarer);
+        declareCheckpointStream(declarer);
+    }
+
+    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
         declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
index ed55e1d..ef6c837 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/IStatefulBolt.java
@@ -18,6 +18,11 @@
 package org.apache.storm.topology;
 
 import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
 
 /**
  * A bolt abstraction for supporting stateful computation. The state of the bolt is
@@ -27,5 +32,17 @@ import org.apache.storm.state.State;
  * state updates. The stateful bolts are expected to anchor the tuples while emitting
  * and ack the input tuples once its processed.</p>
  */
-public interface IStatefulBolt<T extends State> extends IStatefulComponent<T>, IRichBolt {
+public interface IStatefulBolt<T extends State> extends IStatefulComponent<T> {
+    /**
+     * @see org.apache.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)
+     */
+    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
+    /**
+     * @see org.apache.storm.task.IBolt#execute(Tuple)
+     */
+    void execute(Tuple input);
+    /**
+     * @see org.apache.storm.task.IBolt#cleanup()
+     */
+    void cleanup();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 237305e..9873084 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +39,6 @@ import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
 import static org.apache.storm.spout.CheckPointState.Action.PREPARE;
 import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
 import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
-
 /**
  * Wraps a {@link IStatefulBolt} and manages the state of the bolt.
  */
@@ -54,7 +52,6 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     private AckTrackingOutputCollector collector;
 
     public StatefulBoltExecutor(IStatefulBolt<T> bolt) {
-        super(bolt);
         this.bolt = bolt;
     }
 
@@ -74,6 +71,23 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     }
 
     @Override
+    public void cleanup() {
+        bolt.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        bolt.declareOutputFields(declarer);
+        super.declareCheckpointStream(declarer);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return bolt.getComponentConfiguration();
+    }
+
+
+    @Override
     protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) {
         LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid);
         if (action == PREPARE) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3b879d62/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
index 5b7d499..92cad77 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java
@@ -221,6 +221,20 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the windowed bolt
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+     */
+    public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a windowed bolt, intended
+     * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
+     * is triggered for each window interval with the list of current events in the window.
+     *
+     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+     * @param bolt the windowed bolt
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
@@ -240,6 +254,24 @@ public class TopologyBuilder {
      * </p>
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the stateful bolt
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+     */
+    public <T extends State> BoltDeclarer setBolt(String id, IStatefulBolt<T> bolt) throws IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a stateful bolt, that requires its
+     * state (of computation) to be saved. When this bolt is initialized, the {@link IStatefulBolt#initState(State)} method
+     * is invoked after {@link IStatefulBolt#prepare(Map, TopologyContext, OutputCollector)} but before {@link IStatefulBolt#execute(Tuple)}
+     * with its previously saved state.
+     * <p>
+     * The framework provides at-least once guarantee for the state updates. Bolts (both stateful and non-stateful) in a stateful topology
+     * are expected to anchor the tuples while emitting and ack the input tuples once its processed.
+     * </p>
+     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+     * @param bolt the stateful bolt
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
      * @return use the returned object to declare the inputs to this component
      * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
@@ -257,6 +289,22 @@ public class TopologyBuilder {
      *
      * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
      * @param bolt the stateful windowed bolt
+     * @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
+     * @return use the returned object to declare the inputs to this component
+     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
+     */
+    public <T extends State> BoltDeclarer setBolt(String id, IStatefulWindowedBolt<T> bolt) throws IllegalArgumentException {
+        return setBolt(id, bolt, null);
+    }
+
+    /**
+     * Define a new bolt in this topology. This defines a stateful windowed bolt, intended for stateful
+     * windowing operations. The {@link IStatefulWindowedBolt#execute(TupleWindow)} method is triggered
+     * for each window interval with the list of current events in the window. During initialization of
+     * this bolt {@link IStatefulWindowedBolt#initState(State)} is invoked with its previously saved state.
+     *
+     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
+     * @param bolt the stateful windowed bolt
      * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
      * @param <T> the type of the state (e.g. {@link org.apache.storm.state.KeyValueState})
      * @return use the returned object to declare the inputs to this component


[4/4] storm git commit: add STORM-1714 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1714 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3399a4a4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3399a4a4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3399a4a4

Branch: refs/heads/1.x-branch
Commit: 3399a4a46bffaa708c30c6f3e8b15e666c6c6151
Parents: 0ba4bb5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 17:23:28 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:23:28 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3399a4a4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index dbf2345..96c3fe3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.0.1
+ * STORM-1714: StatefulBolts ends up as normal bolts while using TopologyBuilder.setBolt without parallelism
  * STORM-1683: only check non-system streams by default
  * STORM-1680: Provide configuration to set min fetch size in KafkaSpout
  * STORM-1649: Optimize Kryo instaces creation in HBaseWindowsStore


[3/4] storm git commit: Merge branch 'STORM-1714-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-1714-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0ba4bb51
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0ba4bb51
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0ba4bb51

Branch: refs/heads/1.x-branch
Commit: 0ba4bb5100efff705f3b62b78ad476c2591d7eb2
Parents: a7370a6 95e7afc
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 17:23:12 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:23:12 2016 +0900

----------------------------------------------------------------------
 .../topology/BaseStatefulBoltExecutor.java      | 209 +++++++++++++++++++
 .../topology/CheckpointTupleForwarder.java      | 159 +-------------
 .../apache/storm/topology/IStatefulBolt.java    |  19 +-
 .../storm/topology/StatefulBoltExecutor.java    |  22 +-
 .../apache/storm/topology/TopologyBuilder.java  |  48 +++++
 5 files changed, 301 insertions(+), 156 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: [STORM-1714] refactored common logic into BaseStatefulBoltExecutor

Posted by ka...@apache.org.
[STORM-1714] refactored common logic into BaseStatefulBoltExecutor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95e7afc2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95e7afc2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95e7afc2

Branch: refs/heads/1.x-branch
Commit: 95e7afc29cde1b7f16c1b0ab8ad65bbd9ed9a2af
Parents: 3b879d6
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Mon Apr 18 23:17:10 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 17:18:17 2016 +0900

----------------------------------------------------------------------
 .../topology/BaseStatefulBoltExecutor.java      | 209 +++++++++++++++++++
 .../topology/CheckpointTupleForwarder.java      | 165 +--------------
 .../storm/topology/StatefulBoltExecutor.java    |   4 +-
 3 files changed, 218 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
new file mode 100644
index 0000000..b93a061
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.spout.CheckPointState;
+import org.apache.storm.spout.CheckpointSpout;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
+import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
+
+/**
+ * Base class that abstracts the common logic for executing bolts in a stateful topology.
+ */
+public abstract class BaseStatefulBoltExecutor implements IRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(BaseStatefulBoltExecutor.class);
+    private final Map<TransactionRequest, Integer> transactionRequestCount;
+    private int checkPointInputTaskCount;
+    private long lastTxid = Long.MIN_VALUE;
+    protected OutputCollector collector;
+
+    public BaseStatefulBoltExecutor() {
+        transactionRequestCount = new HashMap<>();
+    }
+
+    protected void init(TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
+    }
+
+    /**
+     * returns the total number of input checkpoint streams across
+     * all input tasks to this component.
+     */
+    private int getCheckpointInputTaskCount(TopologyContext context) {
+        int count = 0;
+        for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
+            if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
+                count += context.getComponentTasks(inputStream.get_componentId()).size();
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (CheckpointSpout.isCheckpoint(input)) {
+            processCheckpoint(input);
+        } else {
+            handleTuple(input);
+        }
+    }
+
+    /**
+     * Invokes handleCheckpoint once checkpoint tuple is received on
+     * all input checkpoint streams to this component.
+     */
+    private void processCheckpoint(Tuple input) {
+        CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
+        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
+        if (shouldProcessTransaction(action, txid)) {
+            LOG.debug("Processing action {}, txid {}", action, txid);
+            try {
+                if (txid >= lastTxid) {
+                    handleCheckpoint(input, action, txid);
+                    if (action == ROLLBACK) {
+                        lastTxid = txid - 1;
+                    } else {
+                        lastTxid = txid;
+                    }
+                } else {
+                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
+                    collector.ack(input);
+                }
+            } catch (Throwable th) {
+                LOG.error("Got error while processing checkpoint tuple", th);
+                collector.fail(input);
+                collector.reportError(th);
+            }
+        } else {
+            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
+                              "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
+            collector.ack(input);
+        }
+    }
+
+    /**
+     * Checks if check points have been received from all tasks across
+     * all input streams to this component
+     */
+    private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) {
+        TransactionRequest request = new TransactionRequest(action, txid);
+        Integer count;
+        if ((count = transactionRequestCount.get(request)) == null) {
+            transactionRequestCount.put(request, 1);
+            count = 1;
+        } else {
+            transactionRequestCount.put(request, ++count);
+        }
+        if (count == checkPointInputTaskCount) {
+            transactionRequestCount.remove(request);
+            return true;
+        }
+        return false;
+    }
+
+    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
+        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
+    }
+
+    /**
+     * Sub-classes can implement the logic for handling the tuple.
+     *
+     * @param input the input tuple
+     */
+    protected abstract void handleTuple(Tuple input);
+
+    /**
+     * Sub-classes can implement the logic for handling checkpoint tuple.
+     *
+     * @param checkpointTuple the checkpoint tuple
+     * @param action          the action (prepare, commit, rollback or initstate)
+     * @param txid            the transaction id.
+     */
+    protected abstract void handleCheckpoint(Tuple checkpointTuple, CheckPointState.Action action, long txid);
+
+    protected static class AnchoringOutputCollector extends OutputCollector {
+        AnchoringOutputCollector(IOutputCollector delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public List<Integer> emit(String streamId, List<Object> tuple) {
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
+        }
+
+        @Override
+        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
+        }
+    }
+
+    private static class TransactionRequest {
+        private final CheckPointState.Action action;
+
+        private final long txid;
+
+        TransactionRequest(CheckPointState.Action action, long txid) {
+            this.action = action;
+            this.txid = txid;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            TransactionRequest that = (TransactionRequest) o;
+
+            if (txid != that.txid) return false;
+            return !(action != null ? !action.equals(that.action) : that.action != null);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = action != null ? action.hashCode() : 0;
+            result = 31 * result + (int) (txid ^ (txid >>> 32));
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "TransactionRequest{" +
+                    "action='" + action + '\'' +
+                    ", txid=" + txid +
+                    '}';
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
index 9d21c33..a510c0c 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
@@ -45,41 +45,18 @@ import static org.apache.storm.spout.CheckpointSpout.*;
  * can flow through the entire topology DAG.
  * </p>
  */
-public class CheckpointTupleForwarder implements IRichBolt {
+public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointTupleForwarder.class);
     private final IRichBolt bolt;
-    private final Map<TransactionRequest, Integer> transactionRequestCount;
-    private int checkPointInputTaskCount;
-    private long lastTxid = Long.MIN_VALUE;
-    private AnchoringOutputCollector collector;
-
-    public CheckpointTupleForwarder() {
-        this(null);
-    }
 
     public CheckpointTupleForwarder(IRichBolt bolt) {
         this.bolt = bolt;
-        transactionRequestCount = new HashMap<>();
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        init(context, collector);
-        bolt.prepare(stormConf, context, this.collector);
-    }
-
-    protected void init(TopologyContext context, OutputCollector collector) {
-        this.collector = new AnchoringOutputCollector(collector);
-        this.checkPointInputTaskCount = getCheckpointInputTaskCount(context);
     }
 
     @Override
-    public void execute(Tuple input) {
-        if (CheckpointSpout.isCheckpoint(input)) {
-            processCheckpoint(input);
-        } else {
-            handleTuple(input);
-        }
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector outputCollector) {
+        init(context, new AnchoringOutputCollector(outputCollector));
+        bolt.prepare(stormConf, context, collector);
     }
 
     @Override
@@ -93,18 +70,13 @@ public class CheckpointTupleForwarder implements IRichBolt {
         declareCheckpointStream(declarer);
     }
 
-    protected void declareCheckpointStream(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
-    }
-
     @Override
     public Map<String, Object> getComponentConfiguration() {
         return bolt.getComponentConfiguration();
     }
 
     /**
-     * Forwards the checkpoint tuple downstream. Sub-classes can override
-     * with the logic for handling checkpoint tuple.
+     * Forwards the checkpoint tuple downstream.
      *
      * @param checkpointTuple  the checkpoint tuple
      * @param action the action (prepare, commit, rollback or initstate)
@@ -116,8 +88,8 @@ public class CheckpointTupleForwarder implements IRichBolt {
     }
 
     /**
-     * Hands off tuple to the wrapped bolt to execute. Sub-classes can
-     * override the behavior.
+     * Hands off tuple to the wrapped bolt to execute.
+     *
      * <p>
      * Right now tuples continue to get forwarded while waiting for checkpoints to arrive on other streams
      * after checkpoint arrives on one of the streams. This can cause duplicates but still at least once.
@@ -128,127 +100,4 @@ public class CheckpointTupleForwarder implements IRichBolt {
     protected void handleTuple(Tuple input) {
         bolt.execute(input);
     }
-
-    /**
-     * Invokes handleCheckpoint once checkpoint tuple is received on
-     * all input checkpoint streams to this component.
-     */
-    private void processCheckpoint(Tuple input) {
-        Action action = (Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
-        long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
-        if (shouldProcessTransaction(action, txid)) {
-            LOG.debug("Processing action {}, txid {}", action, txid);
-            try {
-                if (txid >= lastTxid) {
-                    handleCheckpoint(input, action, txid);
-                    if (action == ROLLBACK) {
-                        lastTxid = txid - 1;
-                    } else {
-                        lastTxid = txid;
-                    }
-                } else {
-                    LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid);
-                    collector.ack(input);
-                }
-            } catch (Throwable th) {
-                LOG.error("Got error while processing checkpoint tuple", th);
-                collector.fail(input);
-                collector.reportError(th);
-            }
-        } else {
-            LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " +
-                              "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount);
-            collector.ack(input);
-        }
-    }
-
-    /**
-     * returns the total number of input checkpoint streams across
-     * all input tasks to this component.
-     */
-    private int getCheckpointInputTaskCount(TopologyContext context) {
-        int count = 0;
-        for (GlobalStreamId inputStream : context.getThisSources().keySet()) {
-            if (CHECKPOINT_STREAM_ID.equals(inputStream.get_streamId())) {
-                count += context.getComponentTasks(inputStream.get_componentId()).size();
-            }
-        }
-        return count;
-    }
-
-    /**
-     * Checks if check points have been received from all tasks across
-     * all input streams to this component
-     */
-    private boolean shouldProcessTransaction(Action action, long txid) {
-        TransactionRequest request = new TransactionRequest(action, txid);
-        Integer count;
-        if ((count = transactionRequestCount.get(request)) == null) {
-            transactionRequestCount.put(request, 1);
-            count = 1;
-        } else {
-            transactionRequestCount.put(request, ++count);
-        }
-        if (count == checkPointInputTaskCount) {
-            transactionRequestCount.remove(request);
-            return true;
-        }
-        return false;
-    }
-
-    private static class TransactionRequest {
-        private final Action action;
-        private final long txid;
-
-        TransactionRequest(Action action, long txid) {
-            this.action = action;
-            this.txid = txid;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            TransactionRequest that = (TransactionRequest) o;
-
-            if (txid != that.txid) return false;
-            return !(action != null ? !action.equals(that.action) : that.action != null);
-
-        }
-
-        @Override
-        public int hashCode() {
-            int result = action != null ? action.hashCode() : 0;
-            result = 31 * result + (int) (txid ^ (txid >>> 32));
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "TransactionRequest{" +
-                    "action='" + action + '\'' +
-                    ", txid=" + txid +
-                    '}';
-        }
-    }
-
-
-    protected static class AnchoringOutputCollector extends OutputCollector {
-        AnchoringOutputCollector(IOutputCollector delegate) {
-            super(delegate);
-        }
-
-        @Override
-        public List<Integer> emit(String streamId, List<Object> tuple) {
-            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
-        }
-
-        @Override
-        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-            throw new UnsupportedOperationException("Bolts in a stateful topology must emit anchored tuples.");
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/95e7afc2/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
index 9873084..01b04ab 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
@@ -42,7 +42,7 @@ import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
 /**
  * Wraps a {@link IStatefulBolt} and manages the state of the bolt.
  */
-public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwarder {
+public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor {
     private static final Logger LOG = LoggerFactory.getLogger(StatefulBoltExecutor.class);
     private final IStatefulBolt<T> bolt;
     private State state;
@@ -78,7 +78,7 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         bolt.declareOutputFields(declarer);
-        super.declareCheckpointStream(declarer);
+        declareCheckpointStream(declarer);
     }
 
     @Override