You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:34 UTC
[06/12] storm git commit: [storm-redis] Unify a way to specify Redis
data type
[storm-redis] Unify a way to specify Redis data type
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98704ec8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98704ec8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98704ec8
Branch: refs/heads/master
Commit: 98704ec81a7629d11a95a744ee47d3baf7c2de6a
Parents: 52fec9a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 8 06:10:44 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 8 06:10:44 2015 +0900
----------------------------------------------------------------------
.../storm/redis/trident/state/Options.java | 5 ++-
.../trident/state/RedisClusterMapState.java | 39 ++++++++++++------
.../redis/trident/state/RedisMapState.java | 42 ++++++++++++++------
.../WordCountTridentRedisClusterMap.java | 5 ++-
.../redis/trident/WordCountTridentRedisMap.java | 7 +++-
5 files changed, 70 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index bef7b6c..b262c42 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -17,14 +17,17 @@
*/
package org.apache.storm.redis.trident.state;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import storm.trident.state.Serializer;
import java.io.Serializable;
public class Options<T> implements Serializable {
+ private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+
public int localCacheSize = 1000;
public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
KeyFactory keyFactory = null;
public Serializer<T> serializer = null;
- public String hkey = null;
+ public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index ccf5b38..10bf9ea 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.JedisCluster;
import storm.trident.state.*;
import storm.trident.state.map.*;
@@ -37,9 +38,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return opaque(jedisClusterConfig, new Options());
}
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return opaque(jedisClusterConfig, opts);
}
@@ -60,9 +61,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return transactional(jedisClusterConfig, new Options());
}
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return transactional(jedisClusterConfig, opts);
}
@@ -83,9 +84,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return nonTransactional(jedisClusterConfig, new Options());
}
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return nonTransactional(jedisClusterConfig, opts);
}
@@ -180,7 +181,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
@Override
protected List<String> retrieveValuesFromRedis(List<String> keys) {
String[] stringKeys = keys.toArray(new String[keys.size()]);
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
List<String> values = Lists.newArrayList();
for (String stringKey : keys) {
@@ -189,19 +192,31 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
}
return values;
- } else {
- return jedisCluster.hmget(this.options.hkey, stringKeys);
+
+ case HASH:
+ return jedisCluster.hmget(description.getAdditionalKey(), stringKeys);
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
}
@Override
protected void updateStatesToRedis(Map<String, String> keyValues) {
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
}
- } else {
- jedisCluster.hmset(this.options.hkey, keyValues);
+ break;
+
+ case HASH:
+ jedisCluster.hmset(description.getAdditionalKey(), keyValues);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 1461203..14b014e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,6 +21,7 @@ import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import storm.trident.state.*;
@@ -37,9 +38,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return opaque(jedisPoolConfig, new Options());
}
- public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return opaque(jedisPoolConfig, opts);
}
@@ -60,9 +61,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return transactional(jedisPoolConfig, new Options());
}
- public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return transactional(jedisPoolConfig, opts);
}
@@ -83,9 +84,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return nonTransactional(jedisPoolConfig, new Options());
}
- public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return nonTransactional(jedisPoolConfig, opts);
}
@@ -181,15 +182,23 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
@Override
protected List<String> retrieveValuesFromRedis(List<String> keys) {
String[] stringKeys = keys.toArray(new String[keys.size()]);
+
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
return jedis.mget(stringKeys);
- } else {
- return jedis.hmget(this.options.hkey, stringKeys);
+
+ case HASH:
+ return jedis.hmget(description.getAdditionalKey(), stringKeys);
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
+
} finally {
if (jedis != null) {
jedis.close();
@@ -204,12 +213,21 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
try {
jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
String[] keyValue = buildKeyValuesList(keyValues);
jedis.mset(keyValue);
- } else {
- jedis.hmset(this.options.hkey, keyValues);
+ break;
+
+ case HASH:
+ jedis.hmset(description.getAdditionalKey(), keyValues);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
+
} finally {
if (jedis != null) {
jedis.close();
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 027c785..beb4b5f 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,6 +23,7 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisClusterMapState;
import org.apache.storm.redis.common.config.JedisClusterConfig;
@@ -56,7 +57,9 @@ public class WordCountTridentRedisClusterMap {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 8ecf9ad..cfda54e 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,7 +23,7 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.trident.state.RedisMapState;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import storm.trident.Stream;
@@ -48,7 +48,10 @@ public class WordCountTridentRedisMap {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- StateFactory factory = RedisMapState.transactional(poolConfig);
+
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);