You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/12/30 08:44:47 UTC

[07/12] storm git commit: STORM-1886 Extend KeyValueState iface with delete

STORM-1886 Extend KeyValueState iface with delete

The patch also provides implementation for delete in
RedisKeyValueState and InMemoryKeyValueState.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d55ddffb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d55ddffb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d55ddffb

Branch: refs/heads/1.x-branch
Commit: d55ddffbe6c5bc2c7f022b530e5acfc0bb50c07f
Parents: 44f0beb
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Thu Jun 2 14:32:06 2016 +0200
Committer: Balazs Kossovics <ba...@s4m.io>
Committed: Fri Dec 2 11:42:42 2016 +0100

----------------------------------------------------------------------
 .../storm/redis/state/RedisKeyValueState.java   | 66 +++++++++++++++-----
 .../redis/state/RedisKeyValueStateTest.java     | 62 ++++++++++++++++--
 .../storm/state/InMemoryKeyValueState.java      |  5 ++
 .../org/apache/storm/state/KeyValueState.java   |  7 +++
 4 files changed, 121 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 8769cb0..c686941 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -31,6 +31,8 @@ import org.apache.commons.codec.binary.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -48,7 +50,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     private final Serializer<V> valueSerializer;
     private final JedisCommandsInstanceContainer jedisContainer;
     private Map<String, String> pendingPrepare;
+    private Set<String> pendingDeletePrepare;
     private Map<String, String> pendingCommit;
+    private Set<String> pendingDeleteCommit;
     private Map<String, String> txIds;
 
     public RedisKeyValueState(String namespace) {
@@ -71,7 +75,8 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         this.keySerializer = keySerializer;
         this.valueSerializer = valueSerializer;
         this.jedisContainer = jedisContainer;
-        this.pendingPrepare = new ConcurrentHashMap<>();
+        this.pendingPrepare = new HashMap<>();
+        this.pendingDeletePrepare = new HashSet<>();
         initTxids();
         initPendingCommit();
     }
@@ -98,9 +103,11 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             if (commands.exists(prepareNamespace)) {
                 LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
                 pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
+                pendingDeleteCommit = Collections.emptySet();
             } else {
                 LOG.debug("No previously prepared commits.");
                 pendingCommit = Collections.emptyMap();
+                pendingDeleteCommit = Collections.emptySet();
             }
         } finally {
             jedisContainer.returnInstance(commands);
@@ -110,8 +117,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     @Override
     public void put(K key, V value) {
         LOG.debug("put key '{}', value '{}'", key, value);
-        pendingPrepare.put(encode(keySerializer.serialize(key)),
-                           encode(valueSerializer.serialize(value)));
+        String redisKey = encode(keySerializer.serialize(key));
+        synchronized (this) {
+            pendingPrepare.put(redisKey,
+                    encode(valueSerializer.serialize(value)));
+            pendingDeletePrepare.remove(redisKey);
+        }
     }
 
     @Override
@@ -119,11 +130,17 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         LOG.debug("get key '{}'", key);
         String redisKey = encode(keySerializer.serialize(key));
         String redisValue = null;
-        if (pendingPrepare.containsKey(redisKey)) {
-            redisValue = pendingPrepare.get(redisKey);
-        } else if (pendingCommit.containsKey(redisKey)) {
-            redisValue = pendingCommit.get(redisKey);
-        } else {
+        boolean found = false;
+        synchronized (this) {
+            if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) {
+                redisValue = pendingPrepare.get(redisKey);
+                found = true;
+            } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) {
+                redisValue = pendingCommit.get(redisKey);
+                found = true;
+            }
+        }
+        if (!found) {
             JedisCommands commands = null;
             try {
                 commands = jedisContainer.getInstance();
@@ -147,7 +164,17 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
     }
 
     @Override
-    public void prepareCommit(long txid) {
+    public void delete(K key) {
+        LOG.debug("delete key '{}'", key);
+        String redisKey = encode(keySerializer.serialize(key));
+        synchronized (this) {
+            pendingDeletePrepare.add(redisKey);
+            pendingPrepare.remove(redisKey);
+        }
+    }
+
+    @Override
+    public void prepareCommit(long txid){
         LOG.debug("prepareCommit txid {}", txid);
         validatePrepareTxid(txid);
         JedisCommands commands = null;
@@ -161,15 +188,20 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
                     }
                 }
             }
-            if (!pendingPrepare.isEmpty()) {
+            if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
                 commands.hmset(prepareNamespace, pendingPrepare);
+                commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
             } else {
                 LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
             }
             txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
             commands.hmset(txidNamespace, txIds);
-            pendingCommit = Collections.unmodifiableMap(pendingPrepare);
+            synchronized (this) {
+                pendingCommit = Collections.unmodifiableMap(pendingPrepare);
+                pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare);
+            }
             pendingPrepare = new ConcurrentHashMap<>();
+            pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -182,8 +214,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         JedisCommands commands = null;
         try {
             commands = jedisContainer.getInstance();
-            if (!pendingCommit.isEmpty()) {
+            if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) {
                 commands.hmset(namespace, pendingCommit);
+                commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()]));
             } else {
                 LOG.debug("Nothing to save for commit, txid {}.", txid);
             }
@@ -191,6 +224,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
             commands.hmset(txidNamespace, txIds);
             commands.del(prepareNamespace);
             pendingCommit = Collections.emptyMap();
+            pendingDeleteCommit = Collections.emptySet();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -201,12 +235,14 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
         JedisCommands commands = null;
         try {
             commands = jedisContainer.getInstance();
-            if (!pendingPrepare.isEmpty()) {
+            if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
                 commands.hmset(namespace, pendingPrepare);
+                commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
             } else {
                 LOG.debug("Nothing to save for commit");
             }
-            pendingPrepare = new ConcurrentHashMap<>();
+            pendingPrepare = new HashMap<>();
+            pendingDeletePrepare = new HashSet<>();
         } finally {
             jedisContainer.returnInstance(commands);
         }
@@ -234,7 +270,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
                 commands.hmset(txidNamespace, txIds);
             }
             pendingCommit = Collections.emptyMap();
+            pendingDeleteCommit = Collections.emptySet();
             pendingPrepare = new ConcurrentHashMap<>();
+            pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
         } finally {
             jedisContainer.returnInstance(commands);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index ea8cc15..f5525d0 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -32,10 +32,9 @@ import redis.clients.jedis.SortingParams;
 import redis.clients.jedis.Tuple;
 
 import java.util.HashMap;
-import java.util.List;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
-
 import static org.junit.Assert.*;
 
 /**
@@ -93,6 +92,17 @@ public class RedisKeyValueStateTest {
                     }
                 });
 
+        Mockito.when(mockCommands.hdel(Mockito.anyString(), Mockito.<String>anyVararg()))
+                .thenAnswer(new Answer<Long>() {
+                    @Override
+                    public Long answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        int argsSize = args.length;
+                        String[] fields = Arrays.asList(args).subList(1, argsSize).toArray(new String[argsSize - 1]);
+                        return hdel(mockMap, (String) args[0], fields);
+                    }
+                });
+
         keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(),
                                                                new DefaultStateSerializer<String>());
     }
@@ -108,6 +118,19 @@ public class RedisKeyValueStateTest {
     }
 
     @Test
+    public void testPutAndDelete() throws Exception {
+        keyValueState.put("a", "1");
+        keyValueState.put("b", "2");
+        assertEquals("1", keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+        keyValueState.delete("a");
+        assertEquals(null, keyValueState.get("a"));
+        assertEquals("2", keyValueState.get("b"));
+        assertEquals(null, keyValueState.get("c"));
+    }
+
+    @Test
     public void testPrepareCommitRollback() throws Exception {
         keyValueState.put("a", "1");
         keyValueState.put("b", "2");
@@ -124,6 +147,20 @@ public class RedisKeyValueStateTest {
         assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
         keyValueState.rollback();
         assertArrayEquals(new String[]{"1", "2", null}, getValues());
+        keyValueState.put("c", "3");
+        keyValueState.delete("b");
+        keyValueState.delete("c");
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.prepareCommit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.commit(2);
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
+        keyValueState.put("b", "2");
+        keyValueState.prepareCommit(3);
+        keyValueState.put("c", "3");
+        assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+        keyValueState.rollback();
+        assertArrayEquals(new String[]{"1", null, null}, getValues());
     }
 
     private String[] getValues() {
@@ -135,13 +172,20 @@ public class RedisKeyValueStateTest {
     }
 
     private String hmset(Map<String, Map<String, String>> mockMap, String key, Map value) {
-        mockMap.put(key, value);
+        Map<String, String> currentValue = mockMap.get(key);
+        if (currentValue == null) {
+            currentValue = new HashMap<>();
+        }
+        currentValue.putAll(value);
+        mockMap.put(key, currentValue);
         return "";
     }
 
     private Long del(Map<String, Map<String, String>> mockMap, String key) {
-        mockMap.remove(key);
-        return 0L;
+        if (mockMap.remove(key) == null)
+            return 0L;
+        else
+            return 1L;
     }
 
     private String hget(Map<String, Map<String, String>> mockMap, String namespace, String key) {
@@ -151,4 +195,12 @@ public class RedisKeyValueStateTest {
         return null;
     }
 
+    private Long hdel(Map<String, Map<String, String>> mockMap, String namespace, String ... keys) {
+        Long count = 0L;
+        for (String key: keys) {
+            if (mockMap.get(namespace).remove(key) != null) count++;
+        }
+        return count;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
index 4b116ba..4774d72 100644
--- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
@@ -68,6 +68,11 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
     }
 
     @Override
+    public void delete(K key) {
+        state.remove(key);
+    }
+
+    @Override
     public void commit() {
         commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
index 3ab60f1..0e1facb 100644
--- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
@@ -45,4 +45,11 @@ public interface KeyValueState<K, V> extends State {
      * @return the value or defaultValue if no mapping is found
      */
     V get(K key, V defaultValue);
+
+    /**
+     * Deletes the value mapped to the key, if there is any
+     *
+     * @param key   the key
+     */
+    void delete(K key);
 }