You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/12 21:01:39 UTC

[06/14] storm git commit: Refactoring for accomodating windowing with state

Refactoring for accomodating windowing with state


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40a1c7e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40a1c7e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40a1c7e0

Branch: refs/heads/1.x-branch
Commit: 40a1c7e05962de6e767fb3b37db4eb759fdc9b2d
Parents: 3b59ec5
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Tue Dec 22 12:28:30 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Tue Jan 12 10:21:00 2016 +0530

----------------------------------------------------------------------
 .../storm/redis/state/RedisKeyValueState.java      | 17 +++++++----------
 .../storm/state/InMemoryKeyValueState.java         | 10 +++++-----
 .../storm/state/InMemoryKeyValueStateProvider.java | 12 +++++++++++-
 .../storm/topology/IStatefulComponent.java         | 14 ++++++++++----
 .../storm/topology/StatefulBoltExecutor.java       |  5 +++--
 .../storm/topology/base/BaseStatefulBolt.java      |  9 +++++++--
 .../storm/topology/StatefulBoltExecutorTest.java   |  3 +--
 7 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index db54989..57b30cc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A redis based implementation that persists the state in Redis.
@@ -76,7 +77,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
         this.jedisContainer = jedisContainer;
-        this.pendingPrepare = new HashMap<>();
+        this.pendingPrepare = new ConcurrentHashMap<>();
         initTxids();
         initPendingCommit();
     }
@@ -101,10 +102,10 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             commands = jedisContainer.getInstance();
             if (commands.exists(prepareNamespace)) {
                 LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
-                pendingCommit = commands.hgetAll(prepareNamespace);
+                pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
             } else {
                 LOG.debug("No previously prepared commits.");
-                pendingCommit = new HashMap<>();
+                pendingCommit = Collections.emptyMap();
             }
         } finally {
             jedisContainer.returnInstance(commands);
@@ -169,7 +170,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
             commands.hmset(txidNamespace, txIds);
             pendingCommit = Collections.unmodifiableMap(pendingPrepare);
-            pendingPrepare = new HashMap<>();
+            pendingPrepare = new ConcurrentHashMap<>();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -206,7 +207,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             } else {
                 LOG.debug("Nothing to save for commit");
             }
-            pendingPrepare = new HashMap<>();
+            pendingPrepare = new ConcurrentHashMap<>();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -231,7 +232,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             }
             commands.hmset(txidNamespace, txIds);
             pendingCommit = Collections.emptyMap();
-            pendingPrepare = new HashMap<>();
+            pendingPrepare = new ConcurrentHashMap<>();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -248,10 +249,6 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
                 throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid +
                                                    "' is already committed");
             }
-            if (txid > committedTxid + 1) {
-                throw new RuntimeException("Cannot prepare a txn with id '" + txid +
-                                                   "' when last committed txid is '" + committedTxid + "'");
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
index a516d34..394cf5d 100644
--- a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueState.java
@@ -20,8 +20,8 @@ package backtype.storm.state;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * An in-memory implementation of the {@link State}
@@ -31,7 +31,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
     private static final long DEFAULT_TXID = -1;
     private TxIdState<K, V> commitedState;
     private TxIdState<K, V> preparedState;
-    private Map<K, V> state = new HashMap<>();
+    private Map<K, V> state = new ConcurrentHashMap<>();
 
     private static class TxIdState<K, V> {
         private long txid;
@@ -69,7 +69,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
 
     @Override
     public void commit() {
-        commitedState = new TxIdState<>(DEFAULT_TXID, new HashMap<K, V>(state));
+        commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state));
     }
 
     @Override
@@ -78,7 +78,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
         if (preparedState != null && txid > preparedState.txid) {
             throw new RuntimeException("Cannot prepare a new txn while there is a pending txn");
         }
-        preparedState = new TxIdState<>(txid, new HashMap<K, V>(state));
+        preparedState = new TxIdState<>(txid, new ConcurrentHashMap<K, V>(state));
     }
 
     @Override
@@ -99,7 +99,7 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
         if (commitedState != null) {
             state = commitedState.state;
         } else {
-            state = new HashMap<>();
+            state = new ConcurrentHashMap<>();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
index 1a79e72..aaedbb6 100644
--- a/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
+++ b/storm-core/src/jvm/backtype/storm/state/InMemoryKeyValueStateProvider.java
@@ -20,14 +20,24 @@ package backtype.storm.state;
 import backtype.storm.task.TopologyContext;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Provides {@link InMemoryKeyValueState}
  */
 public class InMemoryKeyValueStateProvider implements StateProvider {
+    private final ConcurrentHashMap<String, State> states = new ConcurrentHashMap<>();
 
     @Override
     public State newState(String namespace, Map stormConf, TopologyContext context) {
-        return new InMemoryKeyValueState();
+        State state = states.get(namespace);
+        if (state == null) {
+            State newState = new InMemoryKeyValueState<>();
+            state = states.putIfAbsent(namespace, newState);
+            if (state == null) {
+                state = newState;
+            }
+        }
+        return state;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java b/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
index 887c91a..ea88d90 100644
--- a/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
+++ b/storm-core/src/jvm/backtype/storm/topology/IStatefulComponent.java
@@ -40,11 +40,17 @@ public interface IStatefulComponent<T extends State> extends IComponent {
      * This is a hook for the component to perform some actions just before the
      * framework commits its state.
      */
-    void preCommit();
+    void preCommit(long txid);
 
     /**
-     * This is a hook for the component to perform some actions just after the
-     * framework commits its state.
+     * This is a hook for the component to perform some actions just before the
+     * framework prepares its state.
+     */
+    void prePrepare(long txid);
+
+    /**
+     * This is a hook for the component to perform some actions just before the
+     * framework rolls back the prepared state.
      */
-    void postCommit();
+    void preRollback();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
index bdb45e3..a7c5b2e 100644
--- a/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/topology/StatefulBoltExecutor.java
@@ -68,12 +68,13 @@ public class StatefulBoltExecutor<T extends State> extends CheckpointTupleForwar
     protected void handleCheckpoint(Tuple input, String action, long txid) {
         LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", input, action, txid);
         if (action.equals(CHECKPOINT_ACTION_PREPARE)) {
+            bolt.prePrepare(txid);
             state.prepareCommit(txid);
         } else if (action.equals(CHECKPOINT_ACTION_COMMIT)) {
-            bolt.preCommit();
+            bolt.preCommit(txid);
             state.commit(txid);
-            bolt.postCommit();
         } else if (action.equals(CHECKPOINT_ACTION_ROLLBACK)) {
+            bolt.preRollback();
             state.rollback();
         } else if (action.equals(CHECKPOINT_ACTION_INITSTATE)) {
             bolt.initState((T) state);

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java b/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
index 93bcd57..80734ab 100644
--- a/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
+++ b/storm-core/src/jvm/backtype/storm/topology/base/BaseStatefulBolt.java
@@ -48,12 +48,17 @@ public abstract class BaseStatefulBolt<T extends State> implements IStatefulBolt
     }
 
     @Override
-    public void preCommit() {
+    public void preCommit(long txid) {
         // NOOP
     }
 
     @Override
-    public void postCommit() {
+    public void prePrepare(long txid) {
+        // NOOP
+    }
+
+    @Override
+    public void preRollback() {
         // NOOP
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/40a1c7e0/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
index 72d2ac7..570258a 100644
--- a/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
+++ b/storm-core/test/jvm/backtype/storm/topology/StatefulBoltExecutorTest.java
@@ -115,8 +115,7 @@ public class StatefulBoltExecutorTest {
         Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
         Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
         executor.execute(mockCheckpointTuple);
-        Mockito.verify(mockBolt, Mockito.times(1)).preCommit();
+        Mockito.verify(mockBolt, Mockito.times(1)).preCommit(new Long(0));
         Mockito.verify(mockState, Mockito.times(1)).commit(new Long(0));
-        Mockito.verify(mockBolt, Mockito.times(1)).postCommit();
     }
 }
\ No newline at end of file