You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/12 21:01:40 UTC
[07/14] storm git commit: Refactored state handling code and
addressed review comments
Refactored state handling code and addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d9a0698f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d9a0698f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d9a0698f
Branch: refs/heads/1.x-branch
Commit: d9a0698f079cd77fc7da40559eeffd7f9fc5b9a5
Parents: 40a1c7e
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Wed Dec 23 18:18:13 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Jan 12 10:21:00 2016 +0530
----------------------------------------------------------------------
conf/defaults.yaml | 1 +
.../storm/redis/state/RedisKeyValueState.java | 4 +-
.../backtype/storm/spout/CheckPointState.java | 106 +++++++++++++-
.../backtype/storm/spout/CheckpointSpout.java | 140 ++++++-------------
.../topology/CheckpointTupleForwarder.java | 18 ++-
.../storm/topology/StatefulBoltExecutor.java | 19 +--
storm-core/src/jvm/org/apache/storm/Config.java | 4 +-
.../storm/spout/CheckpointSpoutTest.java | 21 ++-
.../topology/StatefulBoltExecutorTest.java | 9 +-
9 files changed, 187 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 735a83e..c15f0b3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -253,6 +253,7 @@ topology.disruptor.wait.timeout.millis: 1000
topology.disruptor.batch.size: 100
topology.disruptor.batch.timeout.millis: 1
topology.disable.loadaware: false
+topology.state.checkpoint.interval.ms: 1000
# Configs for Resource Aware Scheduler
# topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 57b30cc..29d33b7 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -40,8 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
- private static final String COMMIT_TXID_KEY = "$commit";
- private static final String PREPARE_TXID_KEY = "$prepare";
+ private static final String COMMIT_TXID_KEY = "commit";
+ private static final String PREPARE_TXID_KEY = "prepare";
private final BASE64Encoder base64Encoder;
private final BASE64Decoder base64Decoder;
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java b/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
index 5ad9772..ff0e088 100644
--- a/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
+++ b/storm-core/src/jvm/backtype/storm/spout/CheckPointState.java
@@ -17,13 +17,30 @@
*/
package backtype.storm.spout;
+import static backtype.storm.spout.CheckPointState.State.COMMITTED;
+import static backtype.storm.spout.CheckPointState.State.COMMITTING;
+import static backtype.storm.spout.CheckPointState.State.PREPARING;
+
/**
- * Captures the current state of the transaction in
- * {@link CheckpointSpout}
+ * Captures the current state of the transaction in {@link CheckpointSpout}. The state transitions are as follows.
+ * <pre>
+ * ROLLBACK(tx2)
+ * <------------- PREPARE(tx2) COMMIT(tx2)
+ * COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2)
+ *
+ * </pre>
+ *
+ * During recovery, if a previous transaction is in PREPARING state, it is rolled back since all bolts in the topology
+ * might not have prepared (saved) the data for commit. If the previous transaction is in COMMITTING state, it is
+ * rolled forward (committed) since some bolts might have already committed the data.
+ * <p>
+ * During normal flow, the state transitions from PREPARING to COMMITTING to COMMITTED. In case of failures the
+ * prepare/commit operation is retried.
+ * </p>
*/
public class CheckPointState {
- public long txid;
- public State state;
+ private long txid;
+ private State state;
public enum State {
/**
@@ -31,15 +48,36 @@ public class CheckPointState {
*/
COMMITTED,
/**
- * The checkpoint spout has started committing the transaction.
+ * The checkpoint spout has started committing the transaction
+ * and the commit is in progress.
*/
COMMITTING,
/**
- * The checkpoint spout has started preparing the transaction for commit.
+ * The checkpoint spout has started preparing the transaction for commit
+ * and the prepare is in progress.
*/
PREPARING
}
+ public enum Action {
+ /**
+ * prepare transaction for commit
+ */
+ PREPARE,
+ /**
+ * commit the previously prepared transaction
+ */
+ COMMIT,
+ /**
+ * rollback the previously prepared transaction
+ */
+ ROLLBACK,
+ /**
+ * initialize the state
+ */
+ INITSTATE
+ }
+
public CheckPointState(long txid, State state) {
this.txid = txid;
this.state = state;
@@ -49,6 +87,62 @@ public class CheckPointState {
public CheckPointState() {
}
+ public long getTxid() {
+ return txid;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ /**
+ * Get the next state based on this checkpoint state.
+ *
+ * @param recovering if in recovering phase
+ * @return the next checkpoint state based on this state.
+ */
+ public CheckPointState nextState(boolean recovering) {
+ CheckPointState nextState;
+ switch (state) {
+ case PREPARING:
+ nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING);
+ break;
+ case COMMITTING:
+ nextState = new CheckPointState(txid, COMMITTED);
+ break;
+ case COMMITTED:
+ nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING);
+ break;
+ default:
+ throw new IllegalStateException("Unknown state " + state);
+ }
+ return nextState;
+ }
+
+ /**
+ * Get the next action to perform based on this checkpoint state.
+ *
+ * @param recovering if in recovering phase
+ * @return the next action to perform based on this state
+ */
+ public Action nextAction(boolean recovering) {
+ Action action;
+ switch (state) {
+ case PREPARING:
+ action = recovering ? Action.ROLLBACK : Action.PREPARE;
+ break;
+ case COMMITTING:
+ action = Action.COMMIT;
+ break;
+ case COMMITTED:
+ action = recovering ? Action.INITSTATE : Action.PREPARE;
+ break;
+ default:
+ throw new IllegalStateException("Unknown state " + state);
+ }
+ return action;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java b/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
index 929dd12..32ed6e6 100644
--- a/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java
@@ -26,28 +26,22 @@ import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+
import static backtype.storm.spout.CheckPointState.State.COMMITTED;
-import static backtype.storm.spout.CheckPointState.State.COMMITTING;
-import static backtype.storm.spout.CheckPointState.State.PREPARING;
+import static backtype.storm.spout.CheckPointState.Action;
/**
* Emits checkpoint tuples which is used to save the state of the {@link backtype.storm.topology.IStatefulComponent}
* across the topology. If a topology contains Stateful bolts, Checkpoint spouts are automatically added
* to the topology. There is only one Checkpoint task per topology.
- * <p/>
- * Checkpoint spout stores its internal state in a {@link KeyValueState}. The state transitions are as follows.
- * <p/>
- * <pre>
- * ROLLBACK(tx2)
- * <------------- PREPARE(tx2) COMMIT(tx2)
- * COMMITTED(tx1)-------------> PREPARING(tx2) --------------> COMMITTING(tx2) -----------------> COMMITTED (tx2)
- *
+ * Checkpoint spout stores its internal state in a {@link KeyValueState}.
*
- * </pre>
+ * @see CheckPointState
*/
public class CheckpointSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class);
@@ -56,22 +50,17 @@ public class CheckpointSpout extends BaseRichSpout {
public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
public static final String CHECKPOINT_FIELD_TXID = "txid";
public static final String CHECKPOINT_FIELD_ACTION = "action";
- public static final String CHECKPOINT_ACTION_PREPARE = "prepare";
- public static final String CHECKPOINT_ACTION_COMMIT = "commit";
- public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback";
- public static final String CHECKPOINT_ACTION_INITSTATE = "initstate";
-
private static final String TX_STATE_KEY = "__state";
- private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every sec
-
private TopologyContext context;
private SpoutOutputCollector collector;
private long lastCheckpointTs;
private int checkpointInterval;
+ private int sleepInterval;
private boolean recoveryStepInProgress;
private boolean checkpointStepInProgress;
private boolean recovering;
private KeyValueState<String, CheckPointState> checkpointState;
+ private CheckPointState curTxState;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
@@ -84,7 +73,9 @@ public class CheckpointSpout extends BaseRichSpout {
this.context = context;
this.collector = collector;
this.checkpointInterval = checkpointInterval;
+ this.sleepInterval = checkpointInterval / 10;
this.checkpointState = checkpointState;
+ this.curTxState = checkpointState.get(TX_STATE_KEY);
lastCheckpointTs = 0;
recoveryStepInProgress = false;
checkpointStepInProgress = false;
@@ -94,28 +85,27 @@ public class CheckpointSpout extends BaseRichSpout {
@Override
public void nextTuple() {
if (shouldRecover()) {
- LOG.debug("In recovery");
handleRecovery();
startProgress();
} else if (shouldCheckpoint()) {
- LOG.debug("In checkpoint");
doCheckpoint();
startProgress();
+ } else {
+ Utils.sleep(sleepInterval);
}
}
@Override
public void ack(Object msgId) {
- CheckPointState txState = getTxState();
- LOG.debug("Got ack with txid {}, current txState {}", msgId, txState);
- if (txState.txid == ((Number) msgId).longValue()) {
+ LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState);
+ if (curTxState.getTxid() == ((Number) msgId).longValue()) {
if (recovering) {
handleRecoveryAck();
} else {
handleCheckpointAck();
}
} else {
- LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, txState.txid);
+ LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid());
}
resetProgress();
}
@@ -131,13 +121,6 @@ public class CheckpointSpout extends BaseRichSpout {
declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
}
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 100);
- return conf;
- }
-
public static boolean isCheckpoint(Tuple input) {
return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
}
@@ -161,12 +144,13 @@ public class CheckpointSpout extends BaseRichSpout {
}
private int loadCheckpointInterval(Map stormConf) {
- int interval;
+ int interval = 0;
if (stormConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) {
interval = ((Number) stormConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue();
- } else {
- interval = DEFAULT_CHECKPOINT_INTERVAL;
}
+ // ensure checkpoint interval is not less than a sane low value.
+ interval = Math.max(100, interval);
+ LOG.info("Checkpoint interval is {} millis", interval);
return interval;
}
@@ -175,91 +159,55 @@ public class CheckpointSpout extends BaseRichSpout {
}
private boolean shouldCheckpoint() {
- return !recovering && !checkpointStepInProgress
- && (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
- }
-
- private boolean shouldRollback(CheckPointState txState) {
- return txState.state == PREPARING;
- }
-
- private boolean shouldCommit(CheckPointState txState) {
- return txState.state == COMMITTING;
+ return !recovering && !checkpointStepInProgress &&
+ (curTxState.getState() != COMMITTED || checkpointIntervalElapsed());
}
- private boolean shouldInitState(CheckPointState txState) {
- return txState.state == COMMITTED;
+ private boolean checkpointIntervalElapsed() {
+ return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval;
}
private void handleRecovery() {
- CheckPointState txState = getTxState();
- LOG.debug("Current txState is {}", txState);
- if (shouldRollback(txState)) {
- LOG.debug("Emitting rollback with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_ROLLBACK), txState.txid);
- } else if (shouldCommit(txState)) {
- LOG.debug("Emitting commit with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_COMMIT), txState.txid);
- } else if (shouldInitState(txState)) {
- LOG.debug("Emitting init state with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_INITSTATE), txState.txid);
- }
- startProgress();
+ LOG.debug("In recovery");
+ Action action = curTxState.nextAction(true);
+ emit(curTxState.getTxid(), action);
}
private void handleRecoveryAck() {
- CheckPointState txState = getTxState();
- if (shouldRollback(txState)) {
- txState.state = COMMITTED;
- --txState.txid;
- saveTxState(txState);
- } else if (shouldCommit(txState)) {
- txState.state = COMMITTED;
- saveTxState(txState);
- } else if (shouldInitState(txState)) {
- LOG.debug("Recovery complete, current state {}", txState);
+ CheckPointState nextState = curTxState.nextState(true);
+ if (curTxState != nextState) {
+ saveTxState(nextState);
+ } else {
+ LOG.debug("Recovery complete, current state {}", curTxState);
recovering = false;
}
}
private void doCheckpoint() {
- CheckPointState txState = getTxState();
- if (txState.state == COMMITTED) {
- txState.txid++;
- txState.state = PREPARING;
- saveTxState(txState);
+ LOG.debug("In checkpoint");
+ if (curTxState.getState() == COMMITTED) {
+ saveTxState(curTxState.nextState(false));
lastCheckpointTs = System.currentTimeMillis();
- LOG.debug("Emitting prepare with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_PREPARE), txState.txid);
- } else if (txState.state == PREPARING) {
- LOG.debug("Emitting prepare with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_PREPARE), txState.txid);
- } else if (txState.state == COMMITTING) {
- LOG.debug("Emitting commit with txid {}", txState.txid);
- collector.emit(CHECKPOINT_STREAM_ID, new Values(txState.txid, CHECKPOINT_ACTION_COMMIT), txState.txid);
}
- startProgress();
+ Action action = curTxState.nextAction(false);
+ emit(curTxState.getTxid(), action);
}
private void handleCheckpointAck() {
- CheckPointState txState = getTxState();
- if (txState.state == PREPARING) {
- txState.state = COMMITTING;
- LOG.debug("Prepare txid {} complete", txState.txid);
- } else if (txState.state == COMMITTING) {
- txState.state = COMMITTED;
- LOG.debug("Commit txid {} complete", txState.txid);
- }
- saveTxState(txState);
+ CheckPointState nextState = curTxState.nextState(false);
+ saveTxState(nextState);
+ }
+
+ private void emit(long txid, Action action) {
+ LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action);
+ collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid);
}
private void saveTxState(CheckPointState txState) {
+ LOG.debug("saveTxState, current state {} -> new state {}", curTxState, txState);
checkpointState.put(TX_STATE_KEY, txState);
checkpointState.commit();
- }
-
- private CheckPointState getTxState() {
- return checkpointState.get(TX_STATE_KEY);
+ curTxState = txState;
}
private void startProgress() {
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java b/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
index 2d25e72..6a0a055 100644
--- a/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/CheckpointTupleForwarder.java
@@ -30,7 +30,11 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-import static backtype.storm.spout.CheckpointSpout.*;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_STREAM_ID;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
+import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
+import static backtype.storm.spout.CheckPointState.Action;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
/**
* Wraps {@link IRichBolt} and forwards checkpoint tuples in a
@@ -94,7 +98,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
* @param action the action (prepare, commit, rollback or initstate)
* @param txid the transaction id.
*/
- protected void handleCheckpoint(Tuple input, String action, long txid) {
+ protected void handleCheckpoint(Tuple input, Action action, long txid) {
collector.emit(CHECKPOINT_STREAM_ID, input, new Values(txid, action));
}
@@ -117,14 +121,14 @@ public class CheckpointTupleForwarder implements IRichBolt {
* all input checkpoint streams to this component.
*/
private void processCheckpoint(Tuple input) {
- String action = input.getStringByField(CHECKPOINT_FIELD_ACTION);
+ Action action = (Action) input.getValueByField(CHECKPOINT_FIELD_ACTION);
long txid = input.getLongByField(CHECKPOINT_FIELD_TXID);
if (shouldProcessTransaction(action, txid)) {
LOG.debug("Processing action {}, txid {}", action, txid);
try {
if (txid >= lastTxid) {
handleCheckpoint(input, action, txid);
- if (CHECKPOINT_ACTION_ROLLBACK.equals(action)) {
+ if (action == ROLLBACK) {
lastTxid = txid - 1;
} else {
lastTxid = txid;
@@ -162,7 +166,7 @@ public class CheckpointTupleForwarder implements IRichBolt {
* Checks if check points have been received from all tasks across
* all input streams to this component
*/
- private boolean shouldProcessTransaction(String action, long txid) {
+ private boolean shouldProcessTransaction(Action action, long txid) {
TransactionRequest request = new TransactionRequest(action, txid);
Integer count;
if ((count = transactionRequestCount.get(request)) == null) {
@@ -179,10 +183,10 @@ public class CheckpointTupleForwarder implements IRichBolt {
}
private static class TransactionRequest {
- private final String action;
+ private final Action action;
private final long txid;
- TransactionRequest(String action, long txid) {
+ TransactionRequest(Action action, long txid) {
this.action = action;
this.txid = txid;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
index a7c5b2e..d44ad48 100644
--- a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
@@ -31,10 +31,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_COMMIT;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_PREPARE;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_ROLLBACK;
-import static backtype.storm.spout.CheckpointSpout.CHECKPOINT_ACTION_INITSTATE;
+import static backtype.storm.spout.CheckPointState.Action;
+import static backtype.storm.spout.CheckPointState.Action.COMMIT;
+import static backtype.storm.spout.CheckPointState.Action.PREPARE;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
+import static backtype.storm.spout.CheckPointState.Action.INITSTATE;
/**
* Wraps a {@link IStatefulBolt} and manages the state of the bolt.
@@ -65,18 +66,18 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
}
@Override
- protected void handleCheckpoint(Tuple input, String action, long txid) {
+ protected void handleCheckpoint(Tuple input, Action action, long txid) {
LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", input, action, txid);
- if (action.equals(CHECKPOINT_ACTION_PREPARE)) {
+ if (action == PREPARE) {
bolt.prePrepare(txid);
state.prepareCommit(txid);
- } else if (action.equals(CHECKPOINT_ACTION_COMMIT)) {
+ } else if (action == COMMIT) {
bolt.preCommit(txid);
state.commit(txid);
- } else if (action.equals(CHECKPOINT_ACTION_ROLLBACK)) {
+ } else if (action == ROLLBACK) {
bolt.preRollback();
state.rollback();
- } else if (action.equals(CHECKPOINT_ACTION_INITSTATE)) {
+ } else if (action == INITSTATE) {
bolt.initState((T) state);
boltInitialized = true;
LOG.debug("{} pending tuples to process", pendingTuples.size());
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 1bd4a54..c18be2f 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1548,7 +1548,9 @@ public class Config extends HashMap<String, Object> {
/**
* The configuration specific to the {@link backtype.storm.state.StateProvider} implementation.
- * This can be overridden at the component level.
+ * This can be overridden at the component level. The value and the interpretation of this config
+ * is based on the state provider implementation. For e.g. this could be just a config file name
+ * which contains the config for the state provider implementation.
*/
@isString
public static final String TOPOLOGY_STATE_PROVIDER_CONFIG = "topology.state.provider.config";
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java b/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
index 15bba91..f341f70 100644
--- a/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
+++ b/storm-core/test/jvm/backtype/storm/spout/CheckpointSpoutTest.java
@@ -26,14 +26,13 @@ import backtype.storm.utils.Utils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
-import org.mockito.internal.matchers.Equals;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static backtype.storm.spout.CheckPointState.Action;
/**
* Unit test for {@link CheckpointSpout}
@@ -56,7 +55,7 @@ public class CheckpointSpoutTest {
spout.open(new HashMap(), mockTopologyContext, mockOutputCollector);
spout.nextTuple();
- Values expectedTuple = new Values(-1L, CheckpointSpout.CHECKPOINT_ACTION_INITSTATE);
+ Values expectedTuple = new Values(-1L, Action.INITSTATE);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
@@ -74,7 +73,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- expectedTuple = new Values(-1L, CheckpointSpout.CHECKPOINT_ACTION_INITSTATE);
+ expectedTuple = new Values(-1L, Action.INITSTATE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(-1L, msgId.getValue());
@@ -95,7 +94,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+ Values expectedTuple = new Values(0L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(0L, msgId.getValue());
@@ -124,7 +123,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+ Values expectedTuple = new Values(0L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(0L, msgId.getValue());
@@ -153,7 +152,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(0L, CheckpointSpout.CHECKPOINT_ACTION_COMMIT);
+ Values expectedTuple = new Values(0L, Action.COMMIT);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(0L, msgId.getValue());
@@ -179,7 +178,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_ROLLBACK);
+ Values expectedTuple = new Values(100L, Action.ROLLBACK);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
@@ -209,7 +208,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_PREPARE);
+ Values expectedTuple = new Values(100L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
@@ -235,7 +234,7 @@ public class CheckpointSpoutTest {
values.capture(),
msgId.capture());
- Values expectedTuple = new Values(100L, CheckpointSpout.CHECKPOINT_ACTION_COMMIT);
+ Values expectedTuple = new Values(100L, Action.COMMIT);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
http://git-wip-us.apache.org/repos/asf/storm/blob/d9a0698f/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
index 570258a..2959a10 100644
--- a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
@@ -36,6 +36,9 @@ import java.util.Map;
import java.util.Set;
import static backtype.storm.spout.CheckpointSpout.*;
+import static backtype.storm.spout.CheckPointState.Action.INITSTATE;
+import static backtype.storm.spout.CheckPointState.Action.ROLLBACK;
+import static backtype.storm.spout.CheckPointState.Action.COMMIT;
/**
* Unit tests for {@link StatefulBoltExecutor}
@@ -86,7 +89,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
- Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_INITSTATE);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
@@ -99,7 +102,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
- Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_ROLLBACK);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
@@ -111,7 +114,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
- Mockito.when(mockCheckpointTuple.getStringByField(CHECKPOINT_FIELD_ACTION)).thenReturn(CHECKPOINT_ACTION_COMMIT);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);