You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ar...@apache.org on 2016/12/30 08:44:48 UTC
[08/12] storm git commit: Use a Map with tombstones to represent
deletions
Use a Map with tombstones to represent deletions
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2eb308a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2eb308a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2eb308a6
Branch: refs/heads/1.x-branch
Commit: 2eb308a6d36463b758d1f571cfeb6c9098176d40
Parents: d55ddff
Author: Balazs Kossovics <ba...@s4m.io>
Authored: Fri Jun 24 17:51:32 2016 +0200
Committer: Balazs Kossovics <ba...@s4m.io>
Committed: Fri Dec 2 11:42:48 2016 +0100
----------------------------------------------------------------------
.../storm/redis/state/RedisKeyValueState.java | 95 +++++++++-----------
.../redis/state/RedisKeyValueStateTest.java | 9 +-
.../storm/state/DefaultStateSerializer.java | 2 +
.../storm/state/InMemoryKeyValueState.java | 4 +-
.../org/apache/storm/state/KeyValueState.java | 2 +-
.../storm/state/InMemoryKeyValueStateTest.java | 27 ++++++
6 files changed, 80 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
index c686941..db1a061 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.redis.state;
+import com.google.common.base.Optional;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
@@ -31,8 +32,8 @@ import org.apache.commons.codec.binary.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -46,13 +47,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
private final String namespace;
private final String prepareNamespace;
private final String txidNamespace;
+ private final String tombstone;
private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
+ private final Serializer<Optional<V>> valueSerializer;
private final JedisCommandsInstanceContainer jedisContainer;
private Map<String, String> pendingPrepare;
- private Set<String> pendingDeletePrepare;
private Map<String, String> pendingCommit;
- private Set<String> pendingDeleteCommit;
private Map<String, String> txIds;
public RedisKeyValueState(String namespace) {
@@ -60,23 +60,23 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
- this(namespace, poolConfig, new DefaultStateSerializer<K>(), new DefaultStateSerializer<V>());
+ this(namespace, poolConfig, new DefaultStateSerializer<K>(), new DefaultStateSerializer<Optional<V>>());
}
- public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<Optional<V>> valueSerializer) {
this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
}
public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer,
- Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ Serializer<K> keySerializer, Serializer<Optional<V>> valueSerializer) {
this.namespace = namespace;
this.prepareNamespace = namespace + "$prepare";
this.txidNamespace = namespace + "$txid";
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
+ this.tombstone = encode(valueSerializer.serialize(Optional.<V>absent()));
this.jedisContainer = jedisContainer;
- this.pendingPrepare = new HashMap<>();
- this.pendingDeletePrepare = new HashSet<>();
+ this.pendingPrepare = new ConcurrentHashMap<>();
initTxids();
initPendingCommit();
}
@@ -103,11 +103,9 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
if (commands.exists(prepareNamespace)) {
LOG.debug("Loading previously prepared commit from {}", prepareNamespace);
pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace));
- pendingDeleteCommit = Collections.emptySet();
} else {
LOG.debug("No previously prepared commits.");
pendingCommit = Collections.emptyMap();
- pendingDeleteCommit = Collections.emptySet();
}
} finally {
jedisContainer.returnInstance(commands);
@@ -118,11 +116,8 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
public void put(K key, V value) {
LOG.debug("put key '{}', value '{}'", key, value);
String redisKey = encode(keySerializer.serialize(key));
- synchronized (this) {
- pendingPrepare.put(redisKey,
- encode(valueSerializer.serialize(value)));
- pendingDeletePrepare.remove(redisKey);
- }
+ pendingPrepare.put(redisKey,
+ encode(valueSerializer.serialize(Optional.of(value))));
}
@Override
@@ -130,17 +125,11 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
LOG.debug("get key '{}'", key);
String redisKey = encode(keySerializer.serialize(key));
String redisValue = null;
- boolean found = false;
- synchronized (this) {
- if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) {
- redisValue = pendingPrepare.get(redisKey);
- found = true;
- } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) {
- redisValue = pendingCommit.get(redisKey);
- found = true;
- }
- }
- if (!found) {
+ if (pendingPrepare.containsKey(redisKey)) {
+ redisValue = pendingPrepare.get(redisKey);
+ } else if (pendingCommit.containsKey(redisKey)) {
+ redisValue = pendingCommit.get(redisKey);
+ } else {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
@@ -151,7 +140,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
V value = null;
if (redisValue != null) {
- value = valueSerializer.deserialize(decode(redisValue));
+ value = valueSerializer.deserialize(decode(redisValue)).orNull();
}
LOG.debug("Value for key '{}' is '{}'", key, value);
return value;
@@ -164,17 +153,16 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
@Override
- public void delete(K key) {
+ public V delete(K key) {
LOG.debug("delete key '{}'", key);
String redisKey = encode(keySerializer.serialize(key));
- synchronized (this) {
- pendingDeletePrepare.add(redisKey);
- pendingPrepare.remove(redisKey);
- }
+ V curr = get(key);
+ pendingPrepare.put(redisKey, tombstone);
+ return curr;
}
@Override
- public void prepareCommit(long txid){
+ public void prepareCommit(long txid) {
LOG.debug("prepareCommit txid {}", txid);
validatePrepareTxid(txid);
JedisCommands commands = null;
@@ -188,20 +176,15 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
}
}
}
- if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
+ if (!pendingPrepare.isEmpty()) {
commands.hmset(prepareNamespace, pendingPrepare);
- commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
} else {
LOG.debug("Nothing to save for prepareCommit, txid {}.", txid);
}
txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
commands.hmset(txidNamespace, txIds);
- synchronized (this) {
- pendingCommit = Collections.unmodifiableMap(pendingPrepare);
- pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare);
- }
+ pendingCommit = Collections.unmodifiableMap(pendingPrepare);
pendingPrepare = new ConcurrentHashMap<>();
- pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
} finally {
jedisContainer.returnInstance(commands);
}
@@ -214,9 +197,22 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
- if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) {
- commands.hmset(namespace, pendingCommit);
- commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()]));
+ if (!pendingCommit.isEmpty()) {
+ List<String> keysToDelete = new ArrayList<>();
+ Map<String, String> keysToAdd = new HashMap<>();
+ for(Map.Entry<String, String> entry: pendingCommit.entrySet()) {
+ if (tombstone.equals(entry.getValue())) {
+ keysToDelete.add(entry.getKey());
+ } else {
+ keysToAdd.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (!keysToAdd.isEmpty()) {
+ commands.hmset(namespace, keysToAdd);
+ }
+ if (!keysToDelete.isEmpty()) {
+ commands.hdel(namespace, keysToDelete.toArray(new String[0]));
+ }
} else {
LOG.debug("Nothing to save for commit, txid {}.", txid);
}
@@ -224,7 +220,6 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
commands.hmset(txidNamespace, txIds);
commands.del(prepareNamespace);
pendingCommit = Collections.emptyMap();
- pendingDeleteCommit = Collections.emptySet();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -235,14 +230,12 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
- if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) {
+ if (!pendingPrepare.isEmpty()) {
commands.hmset(namespace, pendingPrepare);
- commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()]));
} else {
LOG.debug("Nothing to save for commit");
}
- pendingPrepare = new HashMap<>();
- pendingDeletePrepare = new HashSet<>();
+ pendingPrepare = new ConcurrentHashMap<>();
} finally {
jedisContainer.returnInstance(commands);
}
@@ -270,9 +263,7 @@ public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
commands.hmset(txidNamespace, txIds);
}
pendingCommit = Collections.emptyMap();
- pendingDeleteCommit = Collections.emptySet();
pendingPrepare = new ConcurrentHashMap<>();
- pendingDeletePrepare = Collections.synchronizedSet(new HashSet<String>());
} finally {
jedisContainer.returnInstance(commands);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
index f5525d0..1bb72a7 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.redis.state;
+import com.google.common.base.Optional;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.junit.Before;
@@ -104,7 +105,7 @@ public class RedisKeyValueStateTest {
});
keyValueState = new RedisKeyValueState<String, String>("test", mockContainer, new DefaultStateSerializer<String>(),
- new DefaultStateSerializer<String>());
+ new DefaultStateSerializer<Optional<String>>());
}
@@ -124,7 +125,7 @@ public class RedisKeyValueStateTest {
assertEquals("1", keyValueState.get("a"));
assertEquals("2", keyValueState.get("b"));
assertEquals(null, keyValueState.get("c"));
- keyValueState.delete("a");
+ assertEquals("1", keyValueState.delete("a"));
assertEquals(null, keyValueState.get("a"));
assertEquals("2", keyValueState.get("b"));
assertEquals(null, keyValueState.get("c"));
@@ -148,8 +149,8 @@ public class RedisKeyValueStateTest {
keyValueState.rollback();
assertArrayEquals(new String[]{"1", "2", null}, getValues());
keyValueState.put("c", "3");
- keyValueState.delete("b");
- keyValueState.delete("c");
+ assertEquals("2", keyValueState.delete("b"));
+ assertEquals("3", keyValueState.delete("c"));
assertArrayEquals(new String[]{"1", null, null}, getValues());
keyValueState.prepareCommit(2);
assertArrayEquals(new String[]{"1", null, null}, getValues());
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
index 55e934e..d37101d 100644
--- a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
+++ b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java
@@ -20,6 +20,7 @@ package org.apache.storm.state;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
import java.util.Collections;
import java.util.List;
@@ -40,6 +41,7 @@ public class DefaultStateSerializer<T> implements Serializer<T> {
*/
public DefaultStateSerializer(List<Class<?>> classesToRegister) {
kryo = new Kryo();
+ kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
output = new Output(2000, 2000000000);
for (Class<?> klazz : classesToRegister) {
kryo.register(klazz);
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
index 4774d72..72c1523 100644
--- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java
@@ -68,8 +68,8 @@ public class InMemoryKeyValueState<K, V> implements KeyValueState<K, V> {
}
@Override
- public void delete(K key) {
- state.remove(key);
+ public V delete(K key) {
+ return state.remove(key);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
index 0e1facb..7086331 100644
--- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
+++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
@@ -51,5 +51,5 @@ public interface KeyValueState<K, V> extends State {
*
* @param key the key
*/
- void delete(K key);
+ V delete(K key);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2eb308a6/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
index 361865b..d215627 100644
--- a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
+++ b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java
@@ -44,6 +44,19 @@ public class InMemoryKeyValueStateTest {
}
@Test
+ public void testPutAndDelete() throws Exception {
+ keyValueState.put("a", "1");
+ keyValueState.put("b", "2");
+ assertEquals("1", keyValueState.get("a"));
+ assertEquals("2", keyValueState.get("b"));
+ assertEquals(null, keyValueState.get("c"));
+ assertEquals("1", keyValueState.delete("a"));
+ assertEquals(null, keyValueState.get("a"));
+ assertEquals("2", keyValueState.get("b"));
+ assertEquals(null, keyValueState.get("c"));
+ }
+
+ @Test
public void testPrepareCommitRollback() throws Exception {
keyValueState.put("a", "1");
keyValueState.put("b", "2");
@@ -60,6 +73,20 @@ public class InMemoryKeyValueStateTest {
assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
keyValueState.rollback();
assertArrayEquals(new String[]{"1", "2", null}, getValues());
+ keyValueState.put("c", "3");
+ assertEquals("2", keyValueState.delete("b"));
+ assertEquals("3", keyValueState.delete("c"));
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.prepareCommit(2);
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.commit(2);
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
+ keyValueState.put("b", "2");
+ keyValueState.prepareCommit(3);
+ keyValueState.put("c", "3");
+ assertArrayEquals(new String[]{"1", "2", "3"}, getValues());
+ keyValueState.rollback();
+ assertArrayEquals(new String[]{"1", null, null}, getValues());
}
private String[] getValues() {