You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:31 UTC
[03/12] storm git commit: Extract abstract classes to reduce code
duplication (Redis*MapState)
Extract abstract classes to reduce code duplication (Redis*MapState)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9c0329b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9c0329b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9c0329b
Branch: refs/heads/master
Commit: b9c0329bd182333f0af237e661e88958d7157520
Parents: 8fd1f4b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 18:55:03 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 18:55:03 2015 +0900
----------------------------------------------------------------------
.../trident/state/AbstractRedisMapState.java | 70 ++++++++
.../storm/redis/trident/state/KeyFactory.java | 18 ++
.../storm/redis/trident/state/Options.java | 13 ++
.../trident/state/RedisClusterMapState.java | 137 +++-----------
.../redis/trident/state/RedisMapState.java | 177 +++++--------------
5 files changed, 173 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
new file mode 100644
index 0000000..a85a57a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -0,0 +1,70 @@
+package org.apache.storm.redis.trident.state;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import storm.trident.state.*;
+import storm.trident.state.map.IBackingMap;
+
+import java.util.*;
+
+public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
+ public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+ StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+ StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+ StateType.OPAQUE, new JSONOpaqueSerializer()
+ ));
+
+ @Override public List<T> multiGet(List<List<Object>> keys) {
+ if (keys.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> stringKeys = buildKeys(keys);
+ List<String> values = retrieveValuesFromRedis(stringKeys);
+
+ return deserializeValues(keys, values);
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
+ if (keys.size() == 0) {
+ return;
+ }
+
+ Map<String, String> keyValues = new HashMap<String, String>();
+ for (int i = 0; i < keys.size(); i++) {
+ String val = new String(getSerializer().serialize(vals.get(i)));
+ String redisKey = getKeyFactory().build(keys.get(i));
+ keyValues.put(redisKey, val);
+ }
+
+ updateStatesToRedis(keyValues);
+ }
+
+ private List<String> buildKeys(List<List<Object>> keys) {
+ List<String> stringKeys = new ArrayList<String>();
+
+ for (List<Object> key : keys) {
+ stringKeys.add(getKeyFactory().build(key));
+ }
+
+ return stringKeys;
+ }
+
+ private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+ List<T> result = new ArrayList<T>(keys.size());
+ for (String value : values) {
+ if (value != null) {
+ result.add((T) getSerializer().deserialize(value.getBytes()));
+ } else {
+ result.add(null);
+ }
+ }
+ return result;
+ }
+
+ protected abstract Serializer getSerializer();
+ protected abstract KeyFactory getKeyFactory();
+ protected abstract List<String> retrieveValuesFromRedis(List<String> keys);
+ protected abstract void updateStatesToRedis(Map<String, String> keyValues);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
new file mode 100644
index 0000000..0fac726
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -0,0 +1,18 @@
+package org.apache.storm.redis.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KeyFactory extends Serializable {
+ String build(List<Object> key);
+
+ class DefaultKeyFactory implements KeyFactory {
+ public String build(List<Object> key) {
+ if (key.size() != 1)
+ throw new RuntimeException("Default KeyFactory does not support compound keys");
+
+ return (String) key.get(0);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
new file mode 100644
index 0000000..59e4ecb
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -0,0 +1,13 @@
+package org.apache.storm.redis.trident.state;
+
+import storm.trident.state.Serializer;
+
+import java.io.Serializable;
+
+public class Options<T> implements Serializable {
+ public int localCacheSize = 1000;
+ public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+ KeyFactory keyFactory = null;
+ public Serializer<T> serializer = null;
+ public String hkey = null;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 1154376..ccf5b38 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
import java.util.List;
import java.util.Map;
-public class RedisClusterMapState<T> implements IBackingMap<T> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
-
- private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
- StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
- StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
- StateType.OPAQUE, new JSONOpaqueSerializer()
- ));
-
- public static class DefaultKeyFactory implements KeyFactory {
- public String build(List<Object> key) {
- if (key.size() != 1)
- throw new RuntimeException("Default KeyFactory does not support compound keys");
- return (String) key.get(0);
- }
- };
-
- public static class Options<T> implements Serializable {
- public int localCacheSize = 1000;
- public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
- KeyFactory keyFactory = null;
- public Serializer<T> serializer = null;
- public String hkey = null;
- }
-
- public static interface KeyFactory extends Serializable {
- String build(List<Object> key);
- }
-
+public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
/**
* OpaqueTransactional for redis-cluster.
* */
@@ -150,8 +99,6 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
}
-
-
protected static class Factory implements StateFactory {
public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
@@ -169,7 +116,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
this.keyFactory = options.keyFactory;
if (this.keyFactory == null) {
- this.keyFactory = new DefaultKeyFactory();
+ this.keyFactory = new KeyFactory.DefaultKeyFactory();
}
this.serializer = options.serializer;
if (this.serializer == null) {
@@ -220,74 +167,40 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
this.keyFactory = keyFactory;
}
- public List<T> multiGet(List<List<Object>> keys) {
- if (keys.size() == 0) {
- return Collections.emptyList();
- }
+ @Override
+ protected Serializer getSerializer() {
+ return serializer;
+ }
+
+ @Override
+ protected KeyFactory getKeyFactory() {
+ return keyFactory;
+ }
+
+ @Override
+ protected List<String> retrieveValuesFromRedis(List<String> keys) {
+ String[] stringKeys = keys.toArray(new String[keys.size()]);
if (Strings.isNullOrEmpty(this.options.hkey)) {
- String[] stringKeys = buildKeys(keys);
List<String> values = Lists.newArrayList();
- for (String stringKey : stringKeys) {
+ for (String stringKey : keys) {
String value = jedisCluster.get(stringKey);
values.add(value);
}
- return deserializeValues(keys, values);
+ return values;
} else {
- Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
- List<String> values = buildValuesFromMap(keys, keyValue);
- return deserializeValues(keys, values);
- }
- }
-
- private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
- List<String> values = new ArrayList<String>(keys.size());
- for (List<Object> key : keys) {
- String strKey = keyFactory.build(key);
- String value = keyValue.get(strKey);
- values.add(value);
- }
- return values;
- }
-
- private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
- List<T> result = new ArrayList<T>(keys.size());
- for (String value : values) {
- if (value != null) {
- result.add((T) serializer.deserialize(value.getBytes()));
- } else {
- result.add(null);
- }
+ return jedisCluster.hmget(this.options.hkey, stringKeys);
}
- return result;
- }
-
- private String[] buildKeys(List<List<Object>> keys) {
- String[] stringKeys = new String[keys.size()];
- int index = 0;
- for (List<Object> key : keys)
- stringKeys[index++] = keyFactory.build(key);
- return stringKeys;
}
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- if (keys.size() == 0) {
- return;
- }
-
+ @Override
+ protected void updateStatesToRedis(Map<String, String> keyValues) {
if (Strings.isNullOrEmpty(this.options.hkey)) {
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- String redisKey = keyFactory.build(keys.get(i));
- jedisCluster.set(redisKey, val);
+ for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+ jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
}
} else {
- Map<String, String> keyValues = new HashMap<String, String>();
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- keyValues.put(keyFactory.build(keys.get(i)), val);
- }
jedisCluster.hmset(this.options.hkey, keyValues);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7bc5afb..1461203 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
import java.util.List;
import java.util.Map;
-public class RedisMapState<T> implements IBackingMap<T> {
- private static final Logger logger = LoggerFactory.getLogger(RedisMapState.class);
-
- private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
- StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
- StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
- StateType.OPAQUE, new JSONOpaqueSerializer()
- ));
-
- public static class DefaultKeyFactory implements KeyFactory {
- public String build(List<Object> key) {
- if (key.size() != 1)
- throw new RuntimeException("Default KeyFactory does not support compound keys");
- return (String) key.get(0);
- }
- };
-
- public static class Options<T> implements Serializable {
- public int localCacheSize = 1000;
- public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
- KeyFactory keyFactory = null;
- public Serializer<T> serializer = null;
- public String hkey = null;
- }
-
- public static interface KeyFactory extends Serializable {
- String build(List<Object> key);
- }
-
+public class RedisMapState<T> extends AbstractRedisMapState<T> {
/**
* OpaqueTransactional for redis.
* */
@@ -167,7 +116,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
this.keyFactory = options.keyFactory;
if (this.keyFactory == null) {
- this.keyFactory = new DefaultKeyFactory();
+ this.keyFactory = new KeyFactory.DefaultKeyFactory();
}
this.serializer = options.serializer;
if (this.serializer == null) {
@@ -219,96 +168,64 @@ public class RedisMapState<T> implements IBackingMap<T> {
this.keyFactory = keyFactory;
}
- public List<T> multiGet(List<List<Object>> keys) {
- if (keys.size() == 0) {
- return Collections.emptyList();
- }
+ @Override
+ protected Serializer getSerializer() {
+ return serializer;
+ }
- String[] stringKeys = buildKeys(keys);
-
- if (Strings.isNullOrEmpty(this.options.hkey)) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- List<String> values = jedis.mget(stringKeys);
- return deserializeValues(keys, values);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- } else {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- List<String> values = jedis.hmget(this.options.hkey, stringKeys);
- return deserializeValues(keys, values);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- }
+ @Override
+ protected KeyFactory getKeyFactory() {
+ return keyFactory;
}
- private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
- List<T> result = new ArrayList<T>(keys.size());
- for (String value : values) {
- if (value != null) {
- result.add((T) serializer.deserialize(value.getBytes()));
+ @Override
+ protected List<String> retrieveValuesFromRedis(List<String> keys) {
+ String[] stringKeys = keys.toArray(new String[keys.size()]);
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ return jedis.mget(stringKeys);
} else {
- result.add(null);
+ return jedis.hmget(this.options.hkey, stringKeys);
+ }
+ } finally {
+ if (jedis != null) {
+ jedis.close();
}
}
- return result;
}
- private String[] buildKeys(List<List<Object>> keys) {
- String[] stringKeys = new String[keys.size()];
- int index = 0;
- for (List<Object> key : keys)
- stringKeys[index++] = keyFactory.build(key);
- return stringKeys;
- }
+ @Override
+ protected void updateStatesToRedis(Map<String, String> keyValues) {
+ Jedis jedis = null;
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- if (keys.size() == 0) {
- return;
- }
+ try {
+ jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- String[] keyValue = buildKeyValuesList(keys, vals);
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ String[] keyValue = buildKeyValuesList(keyValues);
jedis.mset(keyValue);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- } else {
- Jedis jedis = jedisPool.getResource();
- try {
- Map<String, String> keyValues = new HashMap<String, String>();
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- keyValues.put(keyFactory.build(keys.get(i)), val);
- }
+ } else {
jedis.hmset(this.options.hkey, keyValues);
-
- } finally {
- jedisPool.returnResource(jedis);
+ }
+ } finally {
+ if (jedis != null) {
+ jedis.close();
}
}
}
- private String[] buildKeyValuesList(List<List<Object>> keys, List<T> vals) {
- String[] keyValues = new String[keys.size() * 2];
- for (int i = 0; i < keys.size(); i++) {
- keyValues[i * 2] = keyFactory.build(keys.get(i));
- keyValues[i * 2 + 1] = new String(serializer.serialize(vals.get(i)));
+ private String[] buildKeyValuesList(Map<String, String> keyValues) {
+ String[] keyValueLists = new String[keyValues.size() * 2];
+
+ int idx = 0;
+ for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+ keyValueLists[idx++] = kvEntry.getKey();
+ keyValueLists[idx++] = kvEntry.getValue();
}
- return keyValues;
+
+ return keyValueLists;
}
}