You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/12/30 08:44:47 UTC
[07/12] storm git commit: STORM-1886 Extend KeyValueState iface with
delete
STORM-1886 Extend KeyValueState iface with delete
The patch also provides implementation for delete in
RedisKeyValueState and InMemoryKeyValueState.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d55ddffb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d55ddffb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d55ddffb
Branch: refs/heads/1.x-branch
Commit: d55ddffbe6c5bc2c7f022b530e5acfc0bb50c07f
Parents: 44f0beb
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Thu Jun 2 14:32:06 2016 +0200
Committer: Balazs Kossovics <ba...@s4m.io>
Committed: Fri Dec 2 11:42:42 2016 +0100
----------------------------------------------------------------------
.../storm/redis/state/RedisKeyValueState.java | 66 +++++++++++++++-----
.../redis/state/RedisKeyValueStateTest.java | 62 ++++++++++++++++--
.../storm/state/InMemoryKeyValueState.java | 5 ++
.../org/apache/storm/state/KeyValueState.java | 7 +++
4 files changed, 121 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index 8769cb0..c686941 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -31,6 +31,8 @@ import org.apache.commons.codec.binary.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -48,7 +50,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
private final Serializer<V> valueSerializer;
private final JedisCommandsInstanceContainer jedisContainer;
private Map<String, String> pendingPrepare;
+ private Set<String> pendingDeletePrepare;
private Map<String, String> pendingCommit;
+ private Set<String> pendingDeleteCommit;
private Map<String, String> txIds;
public RedisKeyValueState(String namespace) {
@@ -71,7 +75,8 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.jedisContainer = jedisContainer;
- this.pendingPrepare = new ConcurrentHashMap<>();
+ this.pendingPrepare = new HashMap<>();
+ this.pendingDeletePrepare = new HashSet<>();
initTxids();
initPendingCommit();
}
@@ -98,9 +103,11 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
if (commands.exists(prepareNamespace)) {
LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
+ pendingDeleteCommit = Collections.emptySet();
} else {
LOG.debug("No previously prepared commits.");
pendingCommit = Collections.emptyMap();
+ pendingDeleteCommit = Collections.emptySet();
}
} finally {
jedisContainer.returnInstance(commands);
@@ -110,8 +117,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
@Override
public void put(K key, V value) {
LOG.debug("put key '{}', value '{}'", key, value);
- pendingPrepare.put(encode(keySerializer.serialize(key)),
- encode(valueSerializer.serialize(value)));
+ String redisKey = encode(keySerializer.serialize(key));
+ synchronized (this) {
+ pendingPrepare.put(redisKey,
+ encode(valueSerializer.serialize(value)));
+ pendingDeletePrepare.remove(redisKey);
+ }
}
@Override
@@ -119,11 +130,17 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
LOG.debug("get key '{}'", key);
String redisKey = encode(keySerializer.serialize(key));
String redisValue = null;
- if (pendingPrepare.containsKey(redisKey)) {
- redisValue = pendingPrepare.get(redisKey);
- } else if (pendingCommit.containsKey(redisKey)) {
- redisValue = pendingCommit.get(redisKey);
- } else {
+ boolean found = false;
+ synchronized (this) {
+ if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) {
+ redisValue = pendingPrepare.get(redisKey);
+ found = true;
+ } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) {
+ redisValue = pendingCommit.get(redisKey);
+ found = true;
+ }
+ }
+ if (!found) {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
@@ -147,7 +164,17 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
@Override
- public void prepareCommit(long txid) {
+ public void delete(K key) {
+ LOG.debug("delete key '{}'", key);
+ String redisKey = encode(keySerializer.serialize(key));
+ synchronized (this) {
+ pendingDeletePrepare.add(redisKey);
+ pendingPrepare.remove(redisKey);
+ }
+ }
+
+ @Override
+ public void prepareCommit(long txid){
LOG.debug("prepareCommit txid {}", txid);
validatePrepareTxid(txid);
JedisCommands commands = null;
@@ -161,15 +188,20 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
}
}
- if (!pendingPrepare.isEmpty()) {
+ if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
commands.hmset(prepareNamespace, pendingPrepare);
+ commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
} else {
LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
}
txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
commands.hmset(txidNamespace, txIds);
- pendingCommit = Collections.unmodifiableMap(pendingPrepare);
+ synchronized (this) {
+ pendingCommit = Collections.unmodifiableMap(pendingPrepare);
+ pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare);
+ }
pendingPrepare = new ConcurrentHashMap<>();
+ pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
} finally {
jedisContainer.returnInstance(commands);
}
@@ -182,8 +214,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
- if (!pendingCommit.isEmpty()) {
+ if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) {
commands.hmset(namespace, pendingCommit);
+ commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()]));
} else {
LOG.debug("Nothing to save for commit, txid {}.", txid);
}
@@ -191,6 +224,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
commands.hmset(txidNamespace, txIds);
commands.del(prepareNamespace);
pendingCommit = Collections.emptyMap();
+ pendingDeleteCommit = Collections.emptySet();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -201,12 +235,14 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
- if (!pendingPrepare.isEmpty()) {
+ if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
commands.hmset(namespace, pendingPrepare);
+ commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
} else {
LOG.debug("Nothing to save for commit");
}
- pendingPrepare = new ConcurrentHashMap<>();
+ pendingPrepare = new HashMap<>();
+ pendingDeletePrepare = new HashSet<>();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -234,7 +270,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
commands.hmset(txidNamespace, txIds);
}
pendingCommit = Collections.emptyMap();
+ pendingDeleteCommit = Collections.emptySet();
pendingPrepare = new ConcurrentHashMap<>();
+ pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
} finally {
jedisContainer.returnInstance(commands);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index ea8cc15..f5525d0 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -32,10 +32,9 @@ import redis.clients.jedis.SortingParams;
import redis.clients.jedis.Tuple;
import java.util.HashMap;
-import java.util.List;
+import java.util.Arrays;
import java.util.Map;
import java.util.Set;
-
import static org.junit.Assert.*;
/**
@@ -93,6 +92,17 @@ public class RedisKeyValueStateTest {
}
});
+ Mockito.when(mockCommands.hdel(Mockito.anyString(), Mockito.<String>anyVararg()))
+ .thenAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ int argsSize = args.length;
+ String[] fields = Arrays.asList(args).subList(1, argsSize).toArray(new String[argsSize - 1]);
+ return hdel(mockMap, (String) args[0], fields);
+ }
+ });
+
keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(),
new DefaultStateSerializer<String>());
}
@@ -108,6 +118,19 @@ public class RedisKeyValueStateTest {
}
@Test
+ public void testPutAndDelete() throws Exception {
+ keyValueState.put("a", "1");
+ keyValueState.put("b", "2");
+ assertEquals("1", keyValueState.get("a"));
+ assertEquals("2", keyValueState.get("b"));
+ assertEquals(null, keyValueState.get("c"));
+ keyValueState.delete("a");
+ assertEquals(null, keyValueState.get("a"));
+ assertEquals("2", keyValueState.get("b"));
+ assertEquals(null, keyValueState.get("c"));
+ }
+
+ @Test
public void testPrepareCommitRollback() throws Exception {
keyValueState.put("a", "1");
keyValueState.put("b", "2");
@@ -124,6 +147,20 @@ public class RedisKeyValueStateTest {
assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
keyValueState.rollback();
assertArrayEquals(new String[]{"1", "2", null}, getValues());
+ keyValueState.put("c", "3");
+ keyValueState.delete("b");
+ keyValueState.delete("c");
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.prepareCommit(2);
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.commit(2);
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.put("b", "2");
+ keyValueState.prepareCommit(3);
+ keyValueState.put("c", "3");
+ assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+ keyValueState.rollback();
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
}
private String[] getValues() {
@@ -135,13 +172,20 @@ public class RedisKeyValueStateTest {
}
private String hmset(Map<String, Map<String, String>> mockMap, String key, Map value) {
- mockMap.put(key, value);
+ Map<String, String> currentValue = mockMap.get(key);
+ if (currentValue == null) {
+ currentValue = new HashMap<>();
+ }
+ currentValue.putAll(value);
+ mockMap.put(key, currentValue);
return "";
}
private Long del(Map<String, Map<String, String>> mockMap, String key) {
- mockMap.remove(key);
- return 0L;
+ if (mockMap.remove(key) == null)
+ return 0L;
+ else
+ return 1L;
}
private String hget(Map<String, Map<String, String>> mockMap, String namespace, String key) {
@@ -151,4 +195,12 @@ public class RedisKeyValueStateTest {
return null;
}
+ private Long hdel(Map<String, Map<String, String>> mockMap, String namespace, String ... keys) {
+ Long count = 0L;
+ for (String key: keys) {
+ if (mockMap.get(namespace).remove(key) != null) count++;
+ }
+ return count;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
index 4b116ba..4774d72 100644
--- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
@@ -68,6 +68,11 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
}
@Override
+ public void delete(K key) {
+ state.remove(key);
+ }
+
+ @Override
public void commit() {
commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d55ddffb/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
index 3ab60f1..0e1facb 100644
--- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
@@ -45,4 +45,11 @@ public interface KeyValueState<K, V> extends State {
* @return the value or defaultValue if no mapping is found
*/
V get(K key, V defaultValue);
+
+ /**
+ * Deletes the value mapped to the key, if there is any
+ *
+ * @param key the key
+ */
+ void delete(K key);
}