You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by na...@apache.org on 2014/03/25 02:54:50 UTC
[1/2] git commit: adds capability to remove keys from memorymapstate
(maintains proper opaque semantics)
Repository: incubator-storm
Updated Branches:
refs/heads/master d5dee0ef5 -> c621a6c1b
adds capability to remove keys from memorymapstate (maintains proper opaque semantics)
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/69594018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/69594018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/69594018
Branch: refs/heads/master
Commit: 69594018082333f6dc168a3e5e07ee2eb9a75c8f
Parents: a2be522
Author: nathanmarz <na...@nathanmarz.com>
Authored: Mon Mar 17 14:07:45 2014 -0700
Committer: nathanmarz <na...@nathanmarz.com>
Committed: Mon Mar 17 14:07:45 2014 -0700
----------------------------------------------------------------------
.../jvm/storm/trident/state/map/OpaqueMap.java | 6 +++-
.../trident/state/map/RemovableMapState.java | 8 +++++
.../storm/trident/testing/MemoryMapState.java | 27 +++++++++++++++-
.../test/clj/storm/trident/state_test.clj | 33 +++++++++++++++++++-
4 files changed, 71 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java b/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
index cd6766d..12f3083 100644
--- a/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
+++ b/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
@@ -43,7 +43,11 @@ public class OpaqueMap<T> implements MapState<T> {
for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
OpaqueValue val = retval.val;
if(val!=null) {
- ret.add((T) val.get(_currTx));
+ if(retval.cached) {
+ ret.add((T) val.getCurr());
+ } else {
+ ret.add((T) val.get(_currTx));
+ }
} else {
ret.add(null);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java b/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
new file mode 100644
index 0000000..cf34f05
--- /dev/null
+++ b/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
@@ -0,0 +1,8 @@
+package storm.trident.state.map;
+
+import java.util.List;
+import storm.trident.state.State;
+
+public interface RemovableMapState<T> extends State {
+ void multiRemove(List<List<Object>> keys);
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
index 5df99f7..fd38900 100644
--- a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
+++ b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
@@ -30,10 +30,13 @@ import storm.trident.state.ValueUpdater;
import storm.trident.state.map.*;
import storm.trident.state.snapshot.Snapshottable;
-public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T> {
+public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, MapState<T>, RemovableMapState<T> {
MemoryMapStateBacking<OpaqueValue> _backing;
SnapshottableMap<T> _delegate;
+ List<List<Object>> _removed = new ArrayList();
+ Long _currTx = null;
+
public MemoryMapState(String id) {
_backing = new MemoryMapStateBacking(id);
@@ -54,6 +57,11 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
public void beginCommit(Long txid) {
_delegate.beginCommit(txid);
+ if(txid==null || !txid.equals(_currTx)) {
+ _backing.multiRemove(_removed);
+ }
+ _removed = new ArrayList();
+ _currTx = txid;
}
public void commit(Long txid) {
@@ -76,6 +84,17 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
return _delegate.multiGet(keys);
}
+ @Override
+ public void multiRemove(List<List<Object>> keys) {
+ List nulls = new ArrayList();
+ for(int i=0; i<keys.size(); i++) {
+ nulls.add(null);
+ }
+ // first just set the keys to null, then flag to remove them at beginning of next commit when we know the current and last value are both null
+ multiPut(keys, nulls);
+ _removed.addAll(keys);
+ }
+
public static class Factory implements StateFactory {
String _id;
@@ -106,6 +125,12 @@ public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection, Ma
this.db = (Map<List<Object>, T>) _dbs.get(id);
}
+ public void multiRemove(List<List<Object>> keys) {
+ for(List<Object> key: keys) {
+ db.remove(key);
+ }
+ }
+
@Override
public List<T> multiGet(List<List<Object>> keys) {
List<T> ret = new ArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/test/clj/storm/trident/state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/storm/trident/state_test.clj b/storm-core/test/clj/storm/trident/state_test.clj
index 6e091e3..63e38ca 100644
--- a/storm-core/test/clj/storm/trident/state_test.clj
+++ b/storm-core/test/clj/storm/trident/state_test.clj
@@ -20,10 +20,16 @@
(:import [storm.trident.state OpaqueValue])
(:import [storm.trident.state CombinerValueUpdater])
(:import [storm.trident.state.map TransactionalMap OpaqueMap])
- (:import [storm.trident.testing MemoryBackingMap])
+ (:import [storm.trident.testing MemoryBackingMap MemoryMapState])
(:use [storm.trident testing])
(:use [backtype.storm util]))
+(defn single-remove [map key]
+ (-> map (.multiRemove [[key]])))
+
+(defn single-put [map key val]
+ (-> map (.multiPut [[key]] [val])))
+
(defn single-get [map key]
(-> map (.multiGet [[key]]) first))
@@ -61,7 +67,9 @@
(is (= nil (single-get map "a")))
;; tests that intra-batch caching works
(is (= 1 (single-update map "a" 1)))
+ (is (= 1 (single-get map "a")))
(is (= 3 (single-update map "a" 2)))
+ (is (= 3 (single-get map "a")))
(.commit map 1)
(.beginCommit map 1)
(is (= nil (single-get map "a")))
@@ -94,3 +102,26 @@
(is (= 7 (single-update map "a" 1)))
(.commit map 2)
))
+
+
+(deftest test-memory-map-state-remove
+ (let [map (MemoryMapState. (uuid))]
+ (.beginCommit map 1)
+ (single-put map "a" 1)
+ (single-put map "b" 2)
+ (.commit map 1)
+ (.beginCommit map 2)
+ (single-remove map "a")
+ (is (nil? (single-get map "a")))
+ (is (= 2 (single-get map "b")))
+ (.commit map 2)
+ (.beginCommit map 2)
+ (is (= 1 (single-get map "a")))
+ (is (= 2 (single-get map "b")))
+ (single-remove map "a")
+ (.commit map 2)
+ (.beginCommit map 3)
+ (is (nil? (single-get map "a")))
+ (is (= 2 (single-get map "b")))
+ (.commit map 3)
+ ))
[2/2] git commit: Merge branch 'removable-memory-map-state'
Posted by na...@apache.org.
Merge branch 'removable-memory-map-state'
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c621a6c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c621a6c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c621a6c1
Branch: refs/heads/master
Commit: c621a6c1b8a4f77aac5d48030eabfbb89851fe48
Parents: d5dee0e 6959401
Author: nathanmarz <na...@nathanmarz.com>
Authored: Mon Mar 24 18:56:46 2014 -0700
Committer: nathanmarz <na...@nathanmarz.com>
Committed: Mon Mar 24 18:56:46 2014 -0700
----------------------------------------------------------------------
.../jvm/storm/trident/state/map/OpaqueMap.java | 6 +++-
.../trident/state/map/RemovableMapState.java | 8 +++++
.../storm/trident/testing/MemoryMapState.java | 27 +++++++++++++++-
.../test/clj/storm/trident/state_test.clj | 33 +++++++++++++++++++-
4 files changed, 71 insertions(+), 3 deletions(-)
----------------------------------------------------------------------