You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/02 22:08:01 UTC

[06/16] storm git commit: [storm-redis] Unify a way to specify Redis data type

[storm-redis] Unify a way to specify Redis data type


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98704ec8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98704ec8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98704ec8

Branch: refs/heads/0.10.x-branch
Commit: 98704ec81a7629d11a95a744ee47d3baf7c2de6a
Parents: 52fec9a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 8 06:10:44 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 8 06:10:44 2015 +0900

----------------------------------------------------------------------
 .../storm/redis/trident/state/Options.java      |  5 ++-
 .../trident/state/RedisClusterMapState.java     | 39 ++++++++++++------
 .../redis/trident/state/RedisMapState.java      | 42 ++++++++++++++------
 .../WordCountTridentRedisClusterMap.java        |  5 ++-
 .../redis/trident/WordCountTridentRedisMap.java |  7 +++-
 5 files changed, 70 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index bef7b6c..b262c42 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -17,14 +17,17 @@
  */
 package org.apache.storm.redis.trident.state;
 
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import storm.trident.state.Serializer;
 
 import java.io.Serializable;
 
 public class Options<T> implements Serializable {
+	private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+
 	public int localCacheSize = 1000;
 	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
 	KeyFactory keyFactory = null;
 	public Serializer<T> serializer = null;
-	public String hkey = null;
+	public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index ccf5b38..10bf9ea 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.JedisCluster;
 import storm.trident.state.*;
 import storm.trident.state.map.*;
@@ -37,9 +38,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return opaque(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return opaque(jedisClusterConfig, opts);
     }
 
@@ -60,9 +61,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return transactional(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return transactional(jedisClusterConfig, opts);
     }
 
@@ -83,9 +84,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return nonTransactional(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return nonTransactional(jedisClusterConfig, opts);
     }
 
@@ -180,7 +181,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
     @Override
     protected List<String> retrieveValuesFromRedis(List<String> keys) {
         String[] stringKeys = keys.toArray(new String[keys.size()]);
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
+        RedisDataTypeDescription description = this.options.dataTypeDescription;
+        switch (description.getDataType()) {
+        case STRING:
             List<String> values = Lists.newArrayList();
 
             for (String stringKey : keys) {
@@ -189,19 +192,31 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
             }
 
             return values;
-        } else {
-            return jedisCluster.hmget(this.options.hkey, stringKeys);
+
+        case HASH:
+            return jedisCluster.hmget(description.getAdditionalKey(), stringKeys);
+
+        default:
+            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
         }
     }
 
     @Override
     protected void updateStatesToRedis(Map<String, String> keyValues) {
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
+        RedisDataTypeDescription description = this.options.dataTypeDescription;
+        switch (description.getDataType()) {
+        case STRING:
             for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
                 jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
             }
-        } else {
-            jedisCluster.hmset(this.options.hkey, keyValues);
+            break;
+
+        case HASH:
+            jedisCluster.hmset(description.getAdditionalKey(), keyValues);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 1461203..14b014e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,6 +21,7 @@ import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 import storm.trident.state.*;
@@ -37,9 +38,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return opaque(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return opaque(jedisPoolConfig, opts);
     }
 
@@ -60,9 +61,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return transactional(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return transactional(jedisPoolConfig, opts);
     }
 
@@ -83,9 +84,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return nonTransactional(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return nonTransactional(jedisPoolConfig, opts);
     }
 
@@ -181,15 +182,23 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
     @Override
     protected List<String> retrieveValuesFromRedis(List<String> keys) {
         String[] stringKeys = keys.toArray(new String[keys.size()]);
+
         Jedis jedis = null;
         try {
             jedis = jedisPool.getResource();
 
-            if (Strings.isNullOrEmpty(this.options.hkey)) {
+            RedisDataTypeDescription description = this.options.dataTypeDescription;
+            switch (description.getDataType()) {
+            case STRING:
                 return jedis.mget(stringKeys);
-            } else {
-                return jedis.hmget(this.options.hkey, stringKeys);
+
+            case HASH:
+                return jedis.hmget(description.getAdditionalKey(), stringKeys);
+
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
             }
+
         } finally {
             if (jedis != null) {
                 jedis.close();
@@ -204,12 +213,21 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         try {
             jedis = jedisPool.getResource();
 
-            if (Strings.isNullOrEmpty(this.options.hkey)) {
+            RedisDataTypeDescription description = this.options.dataTypeDescription;
+            switch (description.getDataType()) {
+            case STRING:
                 String[] keyValue = buildKeyValuesList(keyValues);
                 jedis.mset(keyValue);
-            } else {
-                jedis.hmset(this.options.hkey, keyValues);
+                break;
+
+            case HASH:
+                jedis.hmset(description.getAdditionalKey(), keyValues);
+                break;
+
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
             }
+
         } finally {
             if (jedis != null) {
                 jedis.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 027c785..beb4b5f 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,6 +23,7 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
@@ -56,7 +57,9 @@ public class WordCountTridentRedisClusterMap {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 8ecf9ad..cfda54e 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,7 +23,7 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.trident.state.RedisMapState;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
@@ -48,7 +48,10 @@ public class WordCountTridentRedisMap {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        StateFactory factory = RedisMapState.transactional(poolConfig);
+
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);