You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/12 21:01:40 UTC

[07/14] storm git commit: Refactored state handling code and addressed review comments

Refactored state handling code and addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d9a0698f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d9a0698f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d9a0698f

Branch: refs/heads/1.x-branch
Commit: d9a0698f079cd77fc7da40559eeffd7f9fc5b9a5
Parents: 40a1c7e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Dec 23 18:18:13 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Jan 12 10:21:00 2016 +0530

----------------------------------------------------------------------
 conf/defaults.yaml                              |   1 +
 .../storm/redis/state/RedisKeyValueState.java   |   4 +-
 .../backtype/storm/spout/CheckPointState.java   | 106 +++++++++++++-
 .../backtype/storm/spout/CheckpointSpout.java   | 140 ++++++-------------
 .../topology/CheckpointTupleForwarder.java      |  18 ++-
 .../storm/topology/StatefulBoltExecutor.java    |  19 +--
 storm-core/src/jvm/org/apache/storm/Config.java |   4 +-
 .../storm/spout/CheckpointSpoutTest.java        |  21 ++-
 .../topology/StatefulBoltExecutorTest.java      |   9 +-
 9 files changed, 187 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 735a83e..c15f0b3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -253,6 +253,7 @@ topology.disruptor.wait.timeout.millis: 1000
 topology.disruptor.batch.size: 100
 topology.disruptor.batch.timeout.millis: 1
 topology.disable.loadaware: false
+topology.state.checkpoint.interval.ms: 1000
 
 # Configs for Resource Aware Scheduler
 # topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 57b30cc..29d33b7 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -40,8 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
-    private static final String COMMIT_TXID_KEY = "$commit";
-    private static final String PREPARE_TXID_KEY = "$prepare";
+    private static final String COMMIT_TXID_KEY = "commit";
+    private static final String PREPARE_TXID_KEY = "prepare";
 
     private final BASE64Encoder base64Encoder;
     private final BASE64Decoder base64Decoder;

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java b/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
index 5ad9772..ff0e088 100644
--- a/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
+++ b/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
@@ -17,13 +17,30 @@
  */
 package backtype.storm.spout;
 
+import static backtype.storm.spout.CheckPointState.State.COMMITTED;
+import static backtype.storm.spout.CheckPointState.State.COMMITTING;
+import static backtype.storm.spout.CheckPointState.State.PREPARING;
+
 /**
- * Captures the current state of the transaction in
- * {@link CheckpointSpout}
+ * Captures the current state of the transaction in {@link CheckpointSpout}. The state transitions are as follows.
+ * <pre>
+ *                  ROLLBACK(tx2)
+ *               <-------------                  PREPARE(tx2)                     COMMIT(tx2)
+ * COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2)
+ *
+ * </pre>
+ *
+ * During recovery, if a previous transaction is in PREPARING state, it is rolled back since all bolts in the topology
+ * might not have prepared (saved) the data for commit. If the previous transaction is in COMMITTING state, it is
+ * rolled forward (committed) since some bolts might have already committed the data.
+ * <p>
+ * During normal flow, the state transitions from PREPARING to COMMITTING to COMMITTED. In case of failures the
+ * prepare/commit operation is retried.
+ * </p>
  */
 public class CheckPointState {
-    public long txid;
-    public State state;
+    private long txid;
+    private State state;
 
     public enum State {
         /**
@@ -31,15 +48,36 @@ public class CheckPointState {
          */
         COMMITTED,
         /**
-         * The checkpoint spout has started committing the transaction.
+         * The checkpoint spout has started committing the transaction
+         * and the commit is in progress.
          */
         COMMITTING,
         /**
-         * The checkpoint spout has started preparing the transaction for commit.
+         * The checkpoint spout has started preparing the transaction for commit
+         * and the prepare is in progress.
          */
         PREPARING
     }
 
+    public enum Action {
+        /**
+         * prepare transaction for commit
+         */
+        PREPARE,
+        /**
+         * commit the previously prepared transaction
+         */
+        COMMIT,
+        /**
+         * rollback the previously prepared transaction
+         */
+        ROLLBACK,
+        /**
+         * initialize the state
+         */
+        INITSTATE
+    }
+
     public CheckPointState(long txid, State state) {
         this.txid = txid;
         this.state = state;
@@ -49,6 +87,62 @@ public class CheckPointState {
     public CheckPointState() {
     }
 
+    public long getTxid() {
+        return txid;
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    /**
+     * Get the next state based on this checkpoint state.
+     *
+     * @param recovering if in recovering phase
+     * @return the next checkpoint state based on this state.
+     */
+    public CheckPointState nextState(boolean recovering) {
+        CheckPointState nextState;
+        switch (state) {
+            case PREPARING:
+                nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
+                break;
+            case COMMITTING:
+                nextState = new CheckPointState(txid, COMMITTED);
+                break;
+            case COMMITTED:
+                nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
+                break;
+            default:
+                throw new IllegalStateException("Unknown state " + state);
+        }
+        return nextState;
+    }
+
+    /**
+     * Get the next action to perform based on this checkpoint state.
+     *
+     * @param recovering if in recovering phase
+     * @return the next action to perform based on this state
+     */
+    public Action nextAction(boolean recovering) {
+        Action action;
+        switch (state) {
+            case PREPARING:
+                action = recovering ? Action.ROLLBACK : Action.PREPARE;
+                break;
+            case COMMITTING:
+                action = Action.COMMIT;
+                break;
+            case COMMITTED:
+                action = recovering ? Action.INITSTATE : Action.PREPARE;
+                break;
+            default:
+                throw new IllegalStateException("Unknown state " + state);
+        }
+        return action;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java b/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
index 929dd12..32ed6e6 100644
--- a/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
@@ -26,28 +26,22 @@ import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+
 import static backtype.storm.spout.CheckPointState.State.COMMITTED;
-import static backtype.storm.spout.CheckPointState.State.COMMITTING;
-import static backtype.storm.spout.CheckPointState.State.PREPARING;
+import static backtype.storm.spout.CheckPointState.Action;
 
 /**
  * Emits checkpoint tuples which is used to save the state of the {@link backtype.storm.topology.IStatefulComponent}
  * across the topology. If a topology contains Stateful bolts, Checkpoint spouts are automatically added
  * to the topology. There is only one Checkpoint task per topology.
- * <p/>
- * Checkpoint spout stores its internal state in a {@link KeyValueState}. The state transitions are as follows.
- * <p/>
- * <pre>
- *                  ROLLBACK(tx2)
- *               <-------------                  PREPARE(tx2)                     COMMIT(tx2)
- * COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2)
- *
+ * Checkpoint spout stores its internal state in a {@link KeyValueState}.
  *
- * </pre>
+ * @see CheckPointState
  */
 public class CheckpointSpout extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
@@ -56,22 +50,17 @@ public class CheckpointSpout extends BaseRichSpout {
     public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
     public static final String CHECKPOINT_FIELD_TXID = "txid";
     public static final String CHECKPOINT_FIELD_ACTION = "action";
-    public static final String CHECKPOINT_ACTION_PREPARE = "prepare";
-    public static final String CHECKPOINT_ACTION_COMMIT = "commit";
-    public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback";
-    public static final String CHECKPOINT_ACTION_INITSTATE = "initstate";
-
     private static final String TX_STATE_KEY = "__state";
-    private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every sec
-
     private TopologyContext context;
     private SpoutOutputCollector collector;
     private long lastCheckpointTs;
     private int checkpointInterval;
+    private int sleepInterval;
     private boolean recoveryStepInProgress;
     private boolean checkpointStepInProgress;
     private boolean recovering;
     private KeyValueState<String, CheckPointState> checkpointState;
+    private CheckPointState curTxState;
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
@@ -84,7 +73,9 @@ public class CheckpointSpout extends BaseRichSpout {
         this.context = context;
         this.collector = collector;
         this.checkpointInterval = checkpointInterval;
+        this.sleepInterval = checkpointInterval / 10;
         this.checkpointState = checkpointState;
+        this.curTxState = checkpointState.get(TX_STATE_KEY);
         lastCheckpointTs = 0;
         recoveryStepInProgress = false;
         checkpointStepInProgress = false;
@@ -94,28 +85,27 @@ public class CheckpointSpout extends BaseRichSpout {
     @Override
     public void nextTuple() {
         if (shouldRecover()) {
-            LOG.debug("In recovery");
             handleRecovery();
             startProgress();
         } else if (shouldCheckpoint()) {
-            LOG.debug("In checkpoint");
             doCheckpoint();
             startProgress();
+        } else {
+            Utils.sleep(sleepInterval);
         }
     }
 
     @Override
     public void ack(Object msgId) {
-        CheckPointState txState = getTxState();
-        LOG.debug("Got ack with txid {}, current txState {}", msgId, txState);
-        if (txState.txid == ((Number) msgId).longValue()) {
+        LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
+        if (curTxState.getTxid() == ((Number) msgId).longValue()) {
             if (recovering) {
                 handleRecoveryAck();
             } else {
                 handleCheckpointAck();
             }
         } else {
-            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, txState.txid);
+            LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
         }
         resetProgress();
     }
@@ -131,13 +121,6 @@ public class CheckpointSpout extends BaseRichSpout {
         declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
     }
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100);
-        return conf;
-    }
-
     public static boolean isCheckpoint(Tuple input) {
         return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
     }
@@ -161,12 +144,13 @@ public class CheckpointSpout extends BaseRichSpout {
     }
 
     private int loadCheckpointInterval(Map stormConf) {
-        int interval;
+        int interval = 0;
         if (stormConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
             interval = ((Number) stormConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
-        } else {
-            interval = DEFAULT_CHECKPOINT_INTERVAL;
         }
+        // ensure checkpoint interval is not less than a sane low value.
+        interval = Math.max(100, interval);
+        LOG.info("Checkpoint interval is {} millis", interval);
         return interval;
     }
 
@@ -175,91 +159,55 @@ public class CheckpointSpout extends BaseRichSpout {
     }
 
     private boolean shouldCheckpoint() {
-        return !recovering && !checkpointStepInProgress
-                && (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
-    }
-
-    private boolean shouldRollback(CheckPointState txState) {
-        return txState.state == PREPARING;
-    }
-
-    private boolean shouldCommit(CheckPointState txState) {
-        return txState.state == COMMITTING;
+        return !recovering && !checkpointStepInProgress &&
+                (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
     }
 
-    private boolean shouldInitState(CheckPointState txState) {
-        return txState.state == COMMITTED;
+    private boolean checkpointIntervalElapsed() {
+        return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
     }
 
     private void handleRecovery() {
-        CheckPointState txState = getTxState();
-        LOG.debug("Current txState is {}", txState);
-        if (shouldRollback(txState)) {
-            LOG.debug("Emitting rollback with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_ROLLBACK), txState.txid);
-        } else if (shouldCommit(txState)) {
-            LOG.debug("Emitting commit with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_COMMIT), txState.txid);
-        } else if (shouldInitState(txState)) {
-            LOG.debug("Emitting init state with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_INITSTATE), txState.txid);
-        }
-        startProgress();
+        LOG.debug("In recovery");
+        Action action = curTxState.nextAction(true);
+        emit(curTxState.getTxid(), action);
     }
 
     private void handleRecoveryAck() {
-        CheckPointState txState = getTxState();
-        if (shouldRollback(txState)) {
-            txState.state = COMMITTED;
-            --txState.txid;
-            saveTxState(txState);
-        } else if (shouldCommit(txState)) {
-            txState.state = COMMITTED;
-            saveTxState(txState);
-        } else if (shouldInitState(txState)) {
-            LOG.debug("Recovery complete, current state {}", txState);
+        CheckPointState nextState = curTxState.nextState(true);
+        if (curTxState != nextState) {
+            saveTxState(nextState);
+        } else {
+            LOG.debug("Recovery complete, current state {}", curTxState);
             recovering = false;
         }
     }
 
     private void doCheckpoint() {
-        CheckPointState txState = getTxState();
-        if (txState.state == COMMITTED) {
-            txState.txid++;
-            txState.state = PREPARING;
-            saveTxState(txState);
+        LOG.debug("In checkpoint");
+        if (curTxState.getState() == COMMITTED) {
+            saveTxState(curTxState.nextState(false));
             lastCheckpointTs = System.currentTimeMillis();
-            LOG.debug("Emitting prepare with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_PREPARE), txState.txid);
-        } else if (txState.state == PREPARING) {
-            LOG.debug("Emitting prepare with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_PREPARE), txState.txid);
-        } else if (txState.state == COMMITTING) {
-            LOG.debug("Emitting commit with txid {}", txState.txid);
-            collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_COMMIT), txState.txid);
         }
-        startProgress();
+        Action action = curTxState.nextAction(false);
+        emit(curTxState.getTxid(), action);
     }
 
     private void handleCheckpointAck() {
-        CheckPointState txState = getTxState();
-        if (txState.state == PREPARING) {
-            txState.state = COMMITTING;
-            LOG.debug("Prepare txid {} complete", txState.txid);
-        } else if (txState.state == COMMITTING) {
-            txState.state = COMMITTED;
-            LOG.debug("Commit txid {} complete", txState.txid);
-        }
-        saveTxState(txState);
+        CheckPointState nextState = curTxState.nextState(false);
+        saveTxState(nextState);
+    }
+
+    private void emit(long txid, Action action) {
+        LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
+        collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
     }
 
     private void saveTxState(CheckPointState txState) {
+        LOG.debug("saveTxState, current state {} -> new state {}", curTxState, txState);
         checkpointState.put(TX_STATE_KEY, txState);
         checkpointState.commit();
-    }
-
-    private CheckPointState getTxState() {
-        return checkpointState.get(TX_STATE_KEY);
+        curTxState = txState;
     }
 
     private void startProgress() {

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
index 2d25e72..6a0a055 100644
--- a/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
@@ -30,7 +30,11 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-import static backtype.storm.spout.CheckpointSpout.*;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
+import static backtype.storm.spout.CheckPointState.Action;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
 
 /**
  * Wraps {@link IRichBolt} and forwards checkpoint tuples in a
@@ -94,7 +98,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
      * @param action the action (prepare, commit, rollback or initstate)
      * @param txid   the transaction id.
      */
-    protected void handleCheckpoint(Tuple input, String action, long txid) {
+    protected void handleCheckpoint(Tuple input, Action action, long txid) {
         collector.emit(CHECKPOINT_STREAM_ID, input, new Values(txid, action));
     }
 
@@ -117,14 +121,14 @@ public class CheckpointTupleForwarder implements IRichBolt {
      * all input checkpoint streams to this component.
      */
     private void processCheckpoint(Tuple input) {
-        String action = input.getStringByField(CHECKPOINT_FIELD_ACTION);
+        Action action = (Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
         long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
         if (shouldProcessTransaction(action, txid)) {
             LOG.debug("Processing action {}, txid {}", action, txid);
             try {
                 if (txid >= lastTxid) {
                     handleCheckpoint(input, action, txid);
-                    if (CHECKPOINT_ACTION_ROLLBACK.equals(action)) {
+                    if (action == ROLLBACK) {
                         lastTxid = txid - 1;
                     } else {
                         lastTxid = txid;
@@ -162,7 +166,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
      * Checks if check points have been received from all tasks across
      * all input streams to this component
      */
-    private boolean shouldProcessTransaction(String action, long txid) {
+    private boolean shouldProcessTransaction(Action action, long txid) {
         TransactionRequest request = new TransactionRequest(action, txid);
         Integer count;
         if ((count = transactionRequestCount.get(request)) == null) {
@@ -179,10 +183,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
     }
 
     private static class TransactionRequest {
-        private final String action;
+        private final Action action;
         private final long txid;
 
-        TransactionRequest(String action, long txid) {
+        TransactionRequest(Action action, long txid) {
             this.action = action;
             this.txid = txid;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
index a7c5b2e..d44ad48 100644
--- a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
@@ -31,10 +31,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_COMMIT;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_PREPARE;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_ROLLBACK;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_INITSTATE;
+import static backtype.storm.spout.CheckPointState.Action;
+import static backtype.storm.spout.CheckPointState.Action.COMMIT;
+import static backtype.storm.spout.CheckPointState.Action.PREPARE;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
+import static backtype.storm.spout.CheckPointState.Action.INITSTATE;
 
 /**
  * Wraps a {@link IStatefulBolt} and manages the state of the bolt.
@@ -65,18 +66,18 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     }
 
     @Override
-    protected void handleCheckpoint(Tuple input, String action, long txid) {
+    protected void handleCheckpoint(Tuple input, Action action, long txid) {
         LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", input, action, txid);
-        if (action.equals(CHECKPOINT_ACTION_PREPARE)) {
+        if (action == PREPARE) {
             bolt.prePrepare(txid);
             state.prepareCommit(txid);
-        } else if (action.equals(CHECKPOINT_ACTION_COMMIT)) {
+        } else if (action == COMMIT) {
             bolt.preCommit(txid);
             state.commit(txid);
-        } else if (action.equals(CHECKPOINT_ACTION_ROLLBACK)) {
+        } else if (action == ROLLBACK) {
             bolt.preRollback();
             state.rollback();
-        } else if (action.equals(CHECKPOINT_ACTION_INITSTATE)) {
+        } else if (action == INITSTATE) {
             bolt.initState((T) state);
             boltInitialized = true;
             LOG.debug("{} pending tuples to process", pendingTuples.size());

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 1bd4a54..c18be2f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1548,7 +1548,9 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * The configuration specific to the {@link backtype.storm.state.StateProvider} implementation.
-     * This can be overridden at the component level.
+     * This can be overridden at the component level. The value and the interpretation of this config
+     * is based on the state provider implementation. For e.g. this could be just a config file name
+     * which contains the config for the state provider implementation.
      */
     @isString
     public static final String TOPOLOGY_STATE_PROVIDER_CONFIG = "topology.state.provider.config";

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java b/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
index 15bba91..f341f70 100644
--- a/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
+++ b/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
@@ -26,14 +26,13 @@ import backtype.storm.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
-import org.mockito.internal.matchers.Equals;
 
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static backtype.storm.spout.CheckPointState.Action;
 
 /**
  * Unit test for {@link CheckpointSpout}
@@ -56,7 +55,7 @@ public class CheckpointSpoutTest {
         spout.open(new HashMap(), mockTopologyContext, mockOutputCollector);
 
         spout.nextTuple();
-        Values expectedTuple = new Values(-1L, CheckpointSpout.CHECKPOINT_ACTION_INITSTATE);
+        Values expectedTuple = new Values(-1L, Action.INITSTATE);
         ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
         ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
         ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
@@ -74,7 +73,7 @@ public class CheckpointSpoutTest {
                                                  values.capture(),
                                                  msgId.capture());
 
-        expectedTuple = new Values(-1L, CheckpointSpout.CHECKPOINT_ACTION_INITSTATE);
+        expectedTuple = new Values(-1L, Action.INITSTATE);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(-1L, msgId.getValue());
@@ -95,7 +94,7 @@ public class CheckpointSpoutTest {
                                                  values.capture(),
                                                  msgId.capture());
 
-        Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+        Values expectedTuple = new Values(0L, Action.PREPARE);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(0L, msgId.getValue());
@@ -124,7 +123,7 @@ public class CheckpointSpoutTest {
                                                                    values.capture(),
                                                                    msgId.capture());
 
-        Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+        Values expectedTuple = new Values(0L, Action.PREPARE);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(0L, msgId.getValue());
@@ -153,7 +152,7 @@ public class CheckpointSpoutTest {
                                                                    values.capture(),
                                                                    msgId.capture());
 
-        Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_COMMIT);
+        Values expectedTuple = new Values(0L, Action.COMMIT);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(0L, msgId.getValue());
@@ -179,7 +178,7 @@ public class CheckpointSpoutTest {
                                                                    values.capture(),
                                                                    msgId.capture());
 
-        Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_ROLLBACK);
+        Values expectedTuple = new Values(100L, Action.ROLLBACK);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(100L, msgId.getValue());
@@ -209,7 +208,7 @@ public class CheckpointSpoutTest {
                                                                    values.capture(),
                                                                    msgId.capture());
 
-        Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+        Values expectedTuple = new Values(100L, Action.PREPARE);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(100L, msgId.getValue());
@@ -235,7 +234,7 @@ public class CheckpointSpoutTest {
                                                                    values.capture(),
                                                                    msgId.capture());
 
-        Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_COMMIT);
+        Values expectedTuple = new Values(100L, Action.COMMIT);
         assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
         assertEquals(expectedTuple, values.getValue());
         assertEquals(100L, msgId.getValue());

http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
index 570258a..2959a10 100644
--- a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
@@ -36,6 +36,9 @@ import java.util.Map;
 import java.util.Set;
 
 import static backtype.storm.spout.CheckpointSpout.*;
+import static backtype.storm.spout.CheckPointState.Action.INITSTATE;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
+import static backtype.storm.spout.CheckPointState.Action.COMMIT;
 
 /**
  * Unit tests for {@link StatefulBoltExecutor}
@@ -86,7 +89,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
         executor.execute(mockTuple);
         Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
-        Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_INITSTATE);
+        Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
         Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
         executor.execute(mockCheckpointTuple);
@@ -99,7 +102,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
         executor.execute(mockTuple);
         Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
-        Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_ROLLBACK);
+        Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
         Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
         executor.execute(mockCheckpointTuple);
@@ -111,7 +114,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
         executor.execute(mockTuple);
         Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
-        Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_COMMIT);
+        Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
         Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
         executor.execute(mockCheckpointTuple);