You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/12/30 08:44:48 UTC

[08/12] storm git commit: Use a Map with tombstones to represent deletions

Use a Map with tombstones to represent deletions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2eb308a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2eb308a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2eb308a6

Branch: refs/heads/1.x-branch
Commit: 2eb308a6d36463b758d1f571cfeb6c9098176d40
Parents: d55ddff
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Fri Jun 24 17:51:32 2016 +0200
Committer: Balazs Kossovics <ba...@s4m.io>
Committed: Fri Dec 2 11:42:48 2016 +0100

----------------------------------------------------------------------
 .../storm/redis/state/RedisKeyValueState.java   | 95 +++++++++-----------
 .../redis/state/RedisKeyValueStateTest.java     |  9 +-
 .../storm/state/DefaultStateSerializer.java     |  2 +
 .../storm/state/InMemoryKeyValueState.java      |  4 +-
 .../org/apache/storm/state/KeyValueState.java   |  2 +-
 .../storm/state/InMemoryKeyValueStateTest.java  | 27 ++++++
 6 files changed, 80 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index c686941..db1a061 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.redis.state;
 
+import com.google.common.base.Optional;
 import org.apache.storm.state.DefaultStateSerializer;
 import org.apache.storm.state.KeyValueState;
 import org.apache.storm.state.Serializer;
@@ -31,8 +32,8 @@ import org.apache.commons.codec.binary.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -46,13 +47,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     private final String namespace;
     private final String prepareNamespace;
     private final String txidNamespace;
+    private final String tombstone;
     private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
+    private final Serializer<Optional<V>> valueSerializer;
     private final JedisCommandsInstanceContainer jedisContainer;
     private Map<String, String> pendingPrepare;
-    private Set<String> pendingDeletePrepare;
     private Map<String, String> pendingCommit;
-    private Set<String> pendingDeleteCommit;
     private Map<String, String> txIds;
 
     public RedisKeyValueState(String namespace) {
@@ -60,23 +60,23 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     }
 
     public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
-        this(namespace, poolConfig, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
+        this(namespace, poolConfig, new DefaultStateSerializer<K>(), new DefaultStateSerializer<Optional<V>>());
     }
 
-    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<Optional<V>> valueSerializer) {
         this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
     }
 
     public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer,
-                              Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+                              Serializer<K> keySerializer, Serializer<Optional<V>> valueSerializer) {
         this.namespace = namespace;
         this.prepareNamespace = namespace + "$prepare";
         this.txidNamespace = namespace + "$txid";
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
+        this.tombstone = encode(valueSerializer.serialize(Optional.<V>absent()));
         this.jedisContainer = jedisContainer;
-        this.pendingPrepare = new HashMap<>();
-        this.pendingDeletePrepare = new HashSet<>();
+        this.pendingPrepare = new ConcurrentHashMap<>();
         initTxids();
         initPendingCommit();
     }
@@ -103,11 +103,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             if (commands.exists(prepareNamespace)) {
                 LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
                 pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
-                pendingDeleteCommit = Collections.emptySet();
             } else {
                 LOG.debug("No previously prepared commits.");
                 pendingCommit = Collections.emptyMap();
-                pendingDeleteCommit = Collections.emptySet();
             }
         } finally {
             jedisContainer.returnInstance(commands);
@@ -118,11 +116,8 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     public void put(K key, V value) {
         LOG.debug("put key '{}', value '{}'", key, value);
         String redisKey = encode(keySerializer.serialize(key));
-        synchronized (this) {
-            pendingPrepare.put(redisKey,
-                    encode(valueSerializer.serialize(value)));
-            pendingDeletePrepare.remove(redisKey);
-        }
+        pendingPrepare.put(redisKey,
+                encode(valueSerializer.serialize(Optional.of(value))));
     }
 
     @Override
@@ -130,17 +125,11 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         LOG.debug("get key '{}'", key);
         String redisKey = encode(keySerializer.serialize(key));
         String redisValue = null;
-        boolean found = false;
-        synchronized (this) {
-            if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) {
-                redisValue = pendingPrepare.get(redisKey);
-                found = true;
-            } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) {
-                redisValue = pendingCommit.get(redisKey);
-                found = true;
-            }
-        }
-        if (!found) {
+        if (pendingPrepare.containsKey(redisKey)) {
+            redisValue = pendingPrepare.get(redisKey);
+        } else if (pendingCommit.containsKey(redisKey)) {
+            redisValue = pendingCommit.get(redisKey);
+        } else {
             JedisCommands commands = null;
             try {
                 commands = jedisContainer.getInstance();
@@ -151,7 +140,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         }
         V value = null;
         if (redisValue != null) {
-            value = valueSerializer.deserialize(decode(redisValue));
+            value = valueSerializer.deserialize(decode(redisValue)).orNull();
         }
         LOG.debug("Value for key '{}' is '{}'", key, value);
         return value;
@@ -164,17 +153,16 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     }
 
     @Override
-    public void delete(K key) {
+    public V delete(K key) {
         LOG.debug("delete key '{}'", key);
         String redisKey = encode(keySerializer.serialize(key));
-        synchronized (this) {
-            pendingDeletePrepare.add(redisKey);
-            pendingPrepare.remove(redisKey);
-        }
+        V curr = get(key);
+        pendingPrepare.put(redisKey, tombstone);
+        return curr;
     }
 
     @Override
-    public void prepareCommit(long txid){
+    public void prepareCommit(long txid) {
         LOG.debug("prepareCommit txid {}", txid);
         validatePrepareTxid(txid);
         JedisCommands commands = null;
@@ -188,20 +176,15 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
                     }
                 }
             }
-            if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
+            if (!pendingPrepare.isEmpty()) {
                 commands.hmset(prepareNamespace, pendingPrepare);
-                commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
             } else {
                 LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
             }
             txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
             commands.hmset(txidNamespace, txIds);
-            synchronized (this) {
-                pendingCommit = Collections.unmodifiableMap(pendingPrepare);
-                pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare);
-            }
+            pendingCommit = Collections.unmodifiableMap(pendingPrepare);
             pendingPrepare = new ConcurrentHashMap<>();
-            pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -214,9 +197,22 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         JedisCommands commands = null;
         try {
             commands = jedisContainer.getInstance();
-            if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) {
-                commands.hmset(namespace, pendingCommit);
-                commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()]));
+            if (!pendingCommit.isEmpty()) {
+                List<String> keysToDelete = new ArrayList<>();
+                Map<String, String> keysToAdd = new HashMap<>();
+                for(Map.Entry<String, String> entry: pendingCommit.entrySet()) {
+                    if (tombstone.equals(entry.getValue())) {
+                        keysToDelete.add(entry.getKey());
+                    } else {
+                        keysToAdd.put(entry.getKey(), entry.getValue());
+                    }
+                }
+                if (!keysToAdd.isEmpty()) {
+                    commands.hmset(namespace, keysToAdd);
+                }
+                if (!keysToDelete.isEmpty()) {
+                    commands.hdel(namespace, keysToDelete.toArray(new String[0]));
+                }
             } else {
                 LOG.debug("Nothing to save for commit, txid {}.", txid);
             }
@@ -224,7 +220,6 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             commands.hmset(txidNamespace, txIds);
             commands.del(prepareNamespace);
             pendingCommit = Collections.emptyMap();
-            pendingDeleteCommit = Collections.emptySet();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -235,14 +230,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         JedisCommands commands = null;
         try {
             commands = jedisContainer.getInstance();
-            if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
+            if (!pendingPrepare.isEmpty()) {
                 commands.hmset(namespace, pendingPrepare);
-                commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
             } else {
                 LOG.debug("Nothing to save for commit");
             }
-            pendingPrepare = new HashMap<>();
-            pendingDeletePrepare = new HashSet<>();
+            pendingPrepare = new ConcurrentHashMap<>();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -270,9 +263,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
                 commands.hmset(txidNamespace, txIds);
             }
             pendingCommit = Collections.emptyMap();
-            pendingDeleteCommit = Collections.emptySet();
             pendingPrepare = new ConcurrentHashMap<>();
-            pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
         } finally {
             jedisContainer.returnInstance(commands);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index f5525d0..1bb72a7 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.redis.state;
 
+import com.google.common.base.Optional;
 import org.apache.storm.state.DefaultStateSerializer;
 import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
 import org.junit.Before;
@@ -104,7 +105,7 @@ public class RedisKeyValueStateTest {
                 });
 
         keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(),
-                                                               new DefaultStateSerializer<String>());
+                                                               new DefaultStateSerializer<Optional<String>>());
     }
 
 
@@ -124,7 +125,7 @@ public class RedisKeyValueStateTest {
         assertEquals("1", keyValueState.get("a"));
         assertEquals("2", keyValueState.get("b"));
         assertEquals(null, keyValueState.get("c"));
-        keyValueState.delete("a");
+        assertEquals("1", keyValueState.delete("a"));
         assertEquals(null, keyValueState.get("a"));
         assertEquals("2", keyValueState.get("b"));
         assertEquals(null, keyValueState.get("c"));
@@ -148,8 +149,8 @@ public class RedisKeyValueStateTest {
         keyValueState.rollback();
         assertArrayEquals(new String[]{"1", "2", null}, getValues());
         keyValueState.put("c", "3");
-        keyValueState.delete("b");
-        keyValueState.delete("c");
+        assertEquals("2", keyValueState.delete("b"));
+        assertEquals("3", keyValueState.delete("c"));
         assertArrayEquals(new String[]{"1", null, null}, getValues());
         keyValueState.prepareCommit(2);
         assertArrayEquals(new String[]{"1", null, null}, getValues());

http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
index 55e934e..d37101d 100644
--- a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
+++ b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
@@ -20,6 +20,7 @@ package org.apache.storm.state;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +41,7 @@ public class DefaultStateSerializer<T> implements Serializer<T> {
      */
     public DefaultStateSerializer(List<Class<?>> classesToRegister) {
         kryo = new Kryo();
+        kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
         output = new Output(2000, 2000000000);
         for (Class<?> klazz : classesToRegister) {
             kryo.register(klazz);

http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
index 4774d72..72c1523 100644
--- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
@@ -68,8 +68,8 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
     }
 
     @Override
-    public void delete(K key) {
-        state.remove(key);
+    public V delete(K key) {
+        return state.remove(key);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
index 0e1facb..7086331 100644
--- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
@@ -51,5 +51,5 @@ public interface KeyValueState<K, V> extends State {
      *
      * @param key   the key
      */
-    void delete(K key);
+    V delete(K key);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
index 361865b..d215627 100644
--- a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
+++ b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
@@ -44,6 +44,19 @@ public class InMemoryKeyValueStateTest {
     }
 
     @Test
+    public void testPutAndDelete() throws Exception {
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        assertEquals("1", keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+        assertEquals("1", keyValueState.delete("a"));
+        assertEquals(null, keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+    }
+
+    @Test
     public void testPrepareCommitRollback() throws Exception {
         keyValueState.put("a", "1");
         keyValueState.put("b", "2");
@@ -60,6 +73,20 @@ public class InMemoryKeyValueStateTest {
         assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
         keyValueState.rollback();
         assertArrayEquals(new String[]{"1", "2", null}, getValues());
+        keyValueState.put("c", "3");
+        assertEquals("2", keyValueState.delete("b"));
+        assertEquals("3", keyValueState.delete("c"));
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.prepareCommit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.commit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.put("b", "2");
+        keyValueState.prepareCommit(3);
+        keyValueState.put("c", "3");
+        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        keyValueState.rollback();
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
     }
 
     private String[] getValues() {