You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/12 21:01:39 UTC
[06/14] storm git commit: Refactoring for accomodating windowing with
state
Refactoring for accomodating windowing with state
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40a1c7e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40a1c7e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40a1c7e0
Branch: refs/heads/1.x-branch
Commit: 40a1c7e05962de6e767fb3b37db4eb759fdc9b2d
Parents: 3b59ec5
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 22 12:28:30 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Jan 12 10:21:00 2016 +0530
----------------------------------------------------------------------
.../storm/redis/state/RedisKeyValueState.java | 17 +++++++----------
.../storm/state/InMemoryKeyValueState.java | 10 +++++-----
.../storm/state/InMemoryKeyValueStateProvider.java | 12 +++++++++++-
.../storm/topology/IStatefulComponent.java | 14 ++++++++++----
.../storm/topology/StatefulBoltExecutor.java | 5 +++--
.../storm/topology/base/BaseStatefulBolt.java | 9 +++++++--
.../storm/topology/StatefulBoltExecutorTest.java | 3 +--
7 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index db54989..57b30cc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* A redis based implementation that persists the state in Redis.
@@ -76,7 +77,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.jedisContainer = jedisContainer;
- this.pendingPrepare = new HashMap<>();
+ this.pendingPrepare = new ConcurrentHashMap<>();
initTxids();
initPendingCommit();
}
@@ -101,10 +102,10 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
commands = jedisContainer.getInstance();
if (commands.exists(prepareNamespace)) {
LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
- pendingCommit = commands.hgetAll(prepareNamespace);
+ pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
} else {
LOG.debug("No previously prepared commits.");
- pendingCommit = new HashMap<>();
+ pendingCommit = Collections.emptyMap();
}
} finally {
jedisContainer.returnInstance(commands);
@@ -169,7 +170,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
commands.hmset(txidNamespace, txIds);
pendingCommit = Collections.unmodifiableMap(pendingPrepare);
- pendingPrepare = new HashMap<>();
+ pendingPrepare = new ConcurrentHashMap<>();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -206,7 +207,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
} else {
LOG.debug("Nothing to save for commit");
}
- pendingPrepare = new HashMap<>();
+ pendingPrepare = new ConcurrentHashMap<>();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -231,7 +232,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
commands.hmset(txidNamespace, txIds);
pendingCommit = Collections.emptyMap();
- pendingPrepare = new HashMap<>();
+ pendingPrepare = new ConcurrentHashMap<>();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -248,10 +249,6 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid +
"' is already committed");
}
- if (txid > committedTxid + 1) {
- throw new RuntimeException("Cannot prepare a txn with id '" + txid +
- "' when last committed txid is '" + committedTxid + "'");
- }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
index a516d34..394cf5d 100644
--- a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
@@ -20,8 +20,8 @@ package backtype.storm.state;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* An in-memory implementation of the {@link State}
@@ -31,7 +31,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
private static final long DEFAULT_TXID = -1;
private TxIdState<K, V> commitedState;
private TxIdState<K, V> preparedState;
- private Map<K, V> state = new HashMap<>();
+ private Map<K, V> state = new ConcurrentHashMap<>();
private static class TxIdState<K, V> {
private long txid;
@@ -69,7 +69,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
@Override
public void commit() {
- commitedState = new TxIdState<>(DEFAULT_TXID, new HashMap<K, V>(state));
+ commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state));
}
@Override
@@ -78,7 +78,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
if (preparedState != null && txid > preparedState.txid) {
throw new RuntimeException("Cannot prepare a new txn while there is a pending txn");
}
- preparedState = new TxIdState<>(txid, new HashMap<K, V>(state));
+ preparedState = new TxIdState<>(txid, new ConcurrentHashMap<K, V>(state));
}
@Override
@@ -99,7 +99,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
if (commitedState != null) {
state = commitedState.state;
} else {
- state = new HashMap<>();
+ state = new ConcurrentHashMap<>();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
index 1a79e72..aaedbb6 100644
--- a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
+++ b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
@@ -20,14 +20,24 @@ package backtype.storm.state;
import backtype.storm.task.TopologyContext;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Provides {@link InMemoryKeyValueState}
*/
public class InMemoryKeyValueStateProvider implements StateProvider {
+ private final ConcurrentHashMap<String, State> states = new ConcurrentHashMap<>();
@Override
public State newState(String namespace, Map stormConf, TopologyContext context) {
- return new InMemoryKeyValueState();
+ State state = states.get(namespace);
+ if (state == null) {
+ State newState = new InMemoryKeyValueState<>();
+ state = states.putIfAbsent(namespace, newState);
+ if (state == null) {
+ state = newState;
+ }
+ }
+ return state;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java b/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
index 887c91a..ea88d90 100644
--- a/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
+++ b/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
@@ -40,11 +40,17 @@ public interface IStatefulComponent<T extends State> extends IComponent {
* This is a hook for the component to perform some actions just before the
* framework commits its state.
*/
- void preCommit();
+ void preCommit(long txid);
/**
- * This is a hook for the component to perform some actions just after the
- * framework commits its state.
+ * This is a hook for the component to perform some actions just before the
+ * framework prepares its state.
+ */
+ void prePrepare(long txid);
+
+ /**
+ * This is a hook for the component to perform some actions just before the
+ * framework rolls back the prepared state.
*/
- void postCommit();
+ void preRollback();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
index bdb45e3..a7c5b2e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
@@ -68,12 +68,13 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
protected void handleCheckpoint(Tuple input, String action, long txid) {
LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", input, action, txid);
if (action.equals(CHECKPOINT_ACTION_PREPARE)) {
+ bolt.prePrepare(txid);
state.prepareCommit(txid);
} else if (action.equals(CHECKPOINT_ACTION_COMMIT)) {
- bolt.preCommit();
+ bolt.preCommit(txid);
state.commit(txid);
- bolt.postCommit();
} else if (action.equals(CHECKPOINT_ACTION_ROLLBACK)) {
+ bolt.preRollback();
state.rollback();
} else if (action.equals(CHECKPOINT_ACTION_INITSTATE)) {
bolt.initState((T) state);
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
index 93bcd57..80734ab 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
@@ -48,12 +48,17 @@ public abstract class BaseStatefulBolt<T extends State> implements IStatefulBolt
}
@Override
- public void preCommit() {
+ public void preCommit(long txid) {
// NOOP
}
@Override
- public void postCommit() {
+ public void prePrepare(long txid) {
+ // NOOP
+ }
+
+ @Override
+ public void preRollback() {
// NOOP
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
index 72d2ac7..570258a 100644
--- a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
@@ -115,8 +115,7 @@ public class StatefulBoltExecutorTest {
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
- Mockito.verify(mockBolt, Mockito.times(1)).preCommit();
+ Mockito.verify(mockBolt, Mockito.times(1)).preCommit(new Long(0));
Mockito.verify(mockState, Mockito.times(1)).commit(new Long(0));
- Mockito.verify(mockBolt, Mockito.times(1)).postCommit();
}
}
\ No newline at end of file