You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:29 UTC

[01/12] storm git commit: Improve RedisStateQuerier to convert List from Redis value

Repository: storm
Updated Branches:
  refs/heads/master ad98824fc -> a0c032358


Improve RedisStateQuerier to convert List<Values> from Redis value

* Reuse RedisStoreMapper / RedisLookupMapper
** RedisState*Querier / RedisState*Updater


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7fec9a1f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7fec9a1f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7fec9a1f

Branch: refs/heads/master
Commit: 7fec9a1f4b9887897c9c7d0f3f80433fa34f18c0
Parents: b367ab4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 09:38:54 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 09:38:54 2015 +0900

----------------------------------------------------------------------
 .../trident/state/RedisClusterStateQuerier.java | 37 +++++++--------
 .../trident/state/RedisClusterStateUpdater.java | 33 +++++++------
 .../redis/trident/state/RedisStateQuerier.java  | 50 ++++++++++++--------
 .../redis/trident/state/RedisStateUpdater.java  | 32 +++++++------
 .../redis/trident/WordCountLookupMapper.java    | 40 ++++++++++++++++
 .../redis/trident/WordCountStoreMapper.java     | 22 +++++++++
 .../redis/trident/WordCountTridentRedis.java    | 12 +++--
 .../trident/WordCountTridentRedisCluster.java   | 11 +++--
 .../WordCountTridentRedisClusterMap.java        |  1 -
 .../redis/trident/WordCountTridentRedisMap.java |  1 -
 .../redis/trident/WordCountTupleMapper.java     | 16 -------
 11 files changed, 158 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 17614a1..4382fe3 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,6 +19,7 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,37 +30,30 @@ import storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, String> {
+public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
-    private final String redisKeyPrefix;
-    private final TupleMapper tupleMapper;
+    private final RedisLookupMapper lookupMapper;
 
-    public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
-        this.redisKeyPrefix = redisKeyPrefix;
-        this.tupleMapper = tupleMapper;
+    public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
+        this.lookupMapper = lookupMapper;
     }
 
     @Override
-    public List<String> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
-        List<String> ret = Lists.newArrayList();
-
-        List<String> keys = Lists.newArrayList();
+    public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
+        List<List<Values>> ret = Lists.newArrayList();
 
         JedisCluster jedisCluster = null;
         try {
             jedisCluster = redisClusterState.getJedisCluster();
 
+            for (int i = 0 ; i < inputs.size() ; i++) {
+                TridentTuple input = inputs.get(i);
 
-            for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTuple(input);
-                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
-                    key = redisKeyPrefix + key;
-                }
+                String key = lookupMapper.getKeyFromTuple(input);
                 String value = jedisCluster.get(key);
-                ret.add(value);
-
-                logger.debug("redis get key[" + key + "] count[" + value + "]");
+                ret.add(lookupMapper.toTuple(input, value));
+                logger.debug("redis get key[" + key + "] value [" + value + "]");
             }
         } finally {
             if (jedisCluster != null) {
@@ -71,8 +65,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
     }
 
     @Override
-    public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTuple(tuple);
-        collector.emit(new Values(key, s));
+    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+        for (Values value : values) {
+            collector.emit(value);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 023b527..35fb48e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.redis.trident.state;
 
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,13 +32,13 @@ import java.util.List;
 public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
-    private final String redisKeyPrefix;
-    private final TupleMapper tupleMapper;
+    private final RedisStoreMapper storeMapper;
     private final int expireIntervalSec;
 
-    public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
-        this.redisKeyPrefix = redisKeyPrefix;
-        this.tupleMapper = tupleMapper;
+    public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+        this.storeMapper = storeMapper;
+        assertDataType(storeMapper.getDataTypeDescription());
+
         if (expireIntervalSec > 0) {
             this.expireIntervalSec = expireIntervalSec;
         } else {
@@ -52,19 +54,15 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
         try {
             jedisCluster = redisClusterState.getJedisCluster();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTuple(input);
-                String redisKey = key;
-                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
-                    redisKey = redisKeyPrefix + redisKey;
-                }
-                String value = this.tupleMapper.getValueFromTuple(input);
+                String key = storeMapper.getKeyFromTuple(input);
+                String value = storeMapper.getValueFromTuple(input);
 
-                logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+                logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
 
                 if (this.expireIntervalSec > 0) {
-                    jedisCluster.setex(redisKey, expireIntervalSec, value);
+                    jedisCluster.setex(key, expireIntervalSec, value);
                 } else {
-                    jedisCluster.set(redisKey, value);
+                    jedisCluster.set(key, value);
                 }
             }
         } finally {
@@ -73,4 +71,11 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
             }
         }
     }
+
+    private void assertDataType(RedisDataTypeDescription storeMapper) {
+        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+            throw new IllegalArgumentException("State should be STRING type");
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 294e83b..a215741 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,42 +19,43 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import redis.clients.jedis.Jedis;
 import storm.trident.operation.TridentCollector;
 import storm.trident.state.BaseQueryFunction;
 import storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
+public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
+    private final RedisLookupMapper lookupMapper;
 
-    private final String redisKeyPrefix;
-    private final TupleMapper tupleMapper;
-
-    public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
-        this.redisKeyPrefix = redisKeyPrefix;
-        this.tupleMapper = tupleMapper;
+    public RedisStateQuerier(RedisLookupMapper lookupMapper) {
+        this.lookupMapper = lookupMapper;
+        assertDataType(lookupMapper.getDataTypeDescription());
     }
 
     @Override
-    public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
+    public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
+        List<List<Values>> values = new ArrayList<List<Values>>();
+
         List<String> keys = Lists.newArrayList();
         for (TridentTuple input : inputs) {
-            String key = this.tupleMapper.getKeyFromTuple(input);
-            if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
-                key = redisKeyPrefix + key;
-            }
-            keys.add(key);
+            keys.add(lookupMapper.getKeyFromTuple(input));
         }
 
         Jedis jedis = null;
         try {
             jedis = redisState.getJedis();
-            return jedis.mget(keys.toArray(new String[keys.size()]));
+            List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
+
+            for (int i = 0 ; i < redisVals.size() ; i++) {
+                values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+            }
+
+            return values;
         } finally {
             if (jedis != null) {
                 redisState.returnJedis(jedis);
@@ -63,8 +64,15 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     }
 
     @Override
-    public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTuple(tuple);
-        collector.emit(new Values(key, s));
+    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+        for (Values value : values) {
+            collector.emit(value);
+        }
+    }
+
+    private void assertDataType(RedisDataTypeDescription lookupMapper) {
+        if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+            throw new IllegalArgumentException("State should be STRING type");
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 664a222..384a120 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm.redis.trident.state;
 
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,13 +32,13 @@ import java.util.List;
 public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
-    private final String redisKeyPrefix;
-    private final TupleMapper tupleMapper;
+    private final RedisStoreMapper storeMapper;
     private final int expireIntervalSec;
 
-    public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
-        this.redisKeyPrefix = redisKeyPrefix;
-        this.tupleMapper = tupleMapper;
+    public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+        this.storeMapper = storeMapper;
+        assertDataType(storeMapper.getDataTypeDescription());
+
         if (expireIntervalSec > 0) {
             this.expireIntervalSec = expireIntervalSec;
         } else {
@@ -51,19 +53,15 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
         try {
             jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTuple(input);
-                String redisKey = key;
-                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
-                    redisKey = redisKeyPrefix + redisKey;
-                }
-                String value = this.tupleMapper.getValueFromTuple(input);
+                String key = storeMapper.getKeyFromTuple(input);
+                String value = storeMapper.getValueFromTuple(input);
 
-                logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+                logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
 
                 if (this.expireIntervalSec > 0) {
-                    jedis.setex(redisKey, expireIntervalSec, value);
+                    jedis.setex(key, expireIntervalSec, value);
                 } else {
-                    jedis.set(redisKey, value);
+                    jedis.set(key, value);
                 }
             }
         } finally {
@@ -72,4 +70,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
             }
         }
     }
+
+    private void assertDataType(RedisDataTypeDescription storeMapper) {
+        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+            throw new IllegalArgumentException("State should be STRING type");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
new file mode 100644
index 0000000..891a1af
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -0,0 +1,40 @@
+package org.apache.storm.redis.trident;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WordCountLookupMapper implements RedisLookupMapper {
+    @Override
+    public List<Values> toTuple(ITuple input, Object value) {
+        List<Values> values = new ArrayList<Values>();
+        values.add(new Values(getKeyFromTuple(input), value));
+        return values;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "value"));
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return "test_" + tuple.getString(0);
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getInteger(1).toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
new file mode 100644
index 0000000..aa03ead
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -0,0 +1,22 @@
+package org.apache.storm.redis.trident;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+
+public class WordCountStoreMapper implements RedisStoreMapper {
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return "test_" + tuple.getString(0);
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getInteger(1).toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 8b6ebc5..79bab5a 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,7 +23,8 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
@@ -47,7 +48,9 @@ public class WordCountTridentRedis {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TupleMapper tupleMapper = new WordCountTupleMapper();
+
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
         RedisState.Factory factory = new RedisState.Factory(poolConfig);
 
         TridentTopology topology = new TridentTopology();
@@ -55,12 +58,12 @@ public class WordCountTridentRedis {
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisStateUpdater("test_", tupleMapper, 86400000),
+                                new RedisStateUpdater(storeMapper, 86400000),
                                 new Fields());
 
         TridentState state = topology.newStaticState(factory);
         stream = stream.stateQuery(state, new Fields("word"),
-                                new RedisStateQuerier("test_", tupleMapper),
+                                new RedisStateQuerier(lookupMapper),
                                 new Fields("columnName","columnValue"));
         stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
         return topology.build();
@@ -92,5 +95,4 @@ public class WordCountTridentRedis {
             System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index ddb6939..280b273 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -23,7 +23,8 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
@@ -55,7 +56,9 @@ public class WordCountTridentRedisCluster {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TupleMapper tupleMapper = new WordCountTupleMapper();
+
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
         RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
 
         TridentTopology topology = new TridentTopology();
@@ -63,12 +66,12 @@ public class WordCountTridentRedisCluster {
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
+                                new RedisClusterStateUpdater(storeMapper, 86400000),
                                 new Fields());
 
         TridentState state = topology.newStaticState(factory);
         stream = stream.stateQuery(state, new Fields("word"),
-                                new RedisClusterStateQuerier("test_", tupleMapper),
+                                new RedisClusterStateQuerier(lookupMapper),
                                 new Fields("columnName","columnValue"));
         stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
         return topology.build();

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index de1f252..027c785 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -56,7 +56,6 @@ public class WordCountTridentRedisClusterMap {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 4d4afe8..8ecf9ad 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -48,7 +48,6 @@ public class WordCountTridentRedisMap {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = RedisMapState.transactional(poolConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
deleted file mode 100644
index 1e601c9..0000000
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.storm.redis.trident;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-
-public class WordCountTupleMapper implements TupleMapper {
-    @Override
-    public String getKeyFromTuple(ITuple tuple) {
-        return tuple.getString(0);
-    }
-
-    @Override
-    public String getValueFromTuple(ITuple tuple) {
-        return tuple.getInteger(1).toString();
-    }
-}


[06/12] storm git commit: [storm-redis] Unify a way to specify Redis data type

Posted by bo...@apache.org.
[storm-redis] Unify a way to specify Redis data type


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98704ec8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98704ec8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98704ec8

Branch: refs/heads/master
Commit: 98704ec81a7629d11a95a744ee47d3baf7c2de6a
Parents: 52fec9a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 8 06:10:44 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 8 06:10:44 2015 +0900

----------------------------------------------------------------------
 .../storm/redis/trident/state/Options.java      |  5 ++-
 .../trident/state/RedisClusterMapState.java     | 39 ++++++++++++------
 .../redis/trident/state/RedisMapState.java      | 42 ++++++++++++++------
 .../WordCountTridentRedisClusterMap.java        |  5 ++-
 .../redis/trident/WordCountTridentRedisMap.java |  7 +++-
 5 files changed, 70 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index bef7b6c..b262c42 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -17,14 +17,17 @@
  */
 package org.apache.storm.redis.trident.state;
 
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import storm.trident.state.Serializer;
 
 import java.io.Serializable;
 
 public class Options<T> implements Serializable {
+	private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+
 	public int localCacheSize = 1000;
 	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
 	KeyFactory keyFactory = null;
 	public Serializer<T> serializer = null;
-	public String hkey = null;
+	public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index ccf5b38..10bf9ea 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.JedisCluster;
 import storm.trident.state.*;
 import storm.trident.state.map.*;
@@ -37,9 +38,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return opaque(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return opaque(jedisClusterConfig, opts);
     }
 
@@ -60,9 +61,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return transactional(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return transactional(jedisClusterConfig, opts);
     }
 
@@ -83,9 +84,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
         return nonTransactional(jedisClusterConfig, new Options());
     }
 
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return nonTransactional(jedisClusterConfig, opts);
     }
 
@@ -180,7 +181,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
     @Override
     protected List<String> retrieveValuesFromRedis(List<String> keys) {
         String[] stringKeys = keys.toArray(new String[keys.size()]);
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
+        RedisDataTypeDescription description = this.options.dataTypeDescription;
+        switch (description.getDataType()) {
+        case STRING:
             List<String> values = Lists.newArrayList();
 
             for (String stringKey : keys) {
@@ -189,19 +192,31 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
             }
 
             return values;
-        } else {
-            return jedisCluster.hmget(this.options.hkey, stringKeys);
+
+        case HASH:
+            return jedisCluster.hmget(description.getAdditionalKey(), stringKeys);
+
+        default:
+            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
         }
     }
 
     @Override
     protected void updateStatesToRedis(Map<String, String> keyValues) {
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
+        RedisDataTypeDescription description = this.options.dataTypeDescription;
+        switch (description.getDataType()) {
+        case STRING:
             for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
                 jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
             }
-        } else {
-            jedisCluster.hmset(this.options.hkey, keyValues);
+            break;
+
+        case HASH:
+            jedisCluster.hmset(description.getAdditionalKey(), keyValues);
+            break;
+
+        default:
+            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 1461203..14b014e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,6 +21,7 @@ import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 import storm.trident.state.*;
@@ -37,9 +38,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return opaque(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return opaque(jedisPoolConfig, opts);
     }
 
@@ -60,9 +61,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return transactional(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return transactional(jedisPoolConfig, opts);
     }
 
@@ -83,9 +84,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         return nonTransactional(jedisPoolConfig, new Options());
     }
 
-    public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+    public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
         Options opts = new Options();
-        opts.hkey = hkey;
+        opts.dataTypeDescription = dataTypeDescription;
         return nonTransactional(jedisPoolConfig, opts);
     }
 
@@ -181,15 +182,23 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
     @Override
     protected List<String> retrieveValuesFromRedis(List<String> keys) {
         String[] stringKeys = keys.toArray(new String[keys.size()]);
+
         Jedis jedis = null;
         try {
             jedis = jedisPool.getResource();
 
-            if (Strings.isNullOrEmpty(this.options.hkey)) {
+            RedisDataTypeDescription description = this.options.dataTypeDescription;
+            switch (description.getDataType()) {
+            case STRING:
                 return jedis.mget(stringKeys);
-            } else {
-                return jedis.hmget(this.options.hkey, stringKeys);
+
+            case HASH:
+                return jedis.hmget(description.getAdditionalKey(), stringKeys);
+
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
             }
+
         } finally {
             if (jedis != null) {
                 jedis.close();
@@ -204,12 +213,21 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
         try {
             jedis = jedisPool.getResource();
 
-            if (Strings.isNullOrEmpty(this.options.hkey)) {
+            RedisDataTypeDescription description = this.options.dataTypeDescription;
+            switch (description.getDataType()) {
+            case STRING:
                 String[] keyValue = buildKeyValuesList(keyValues);
                 jedis.mset(keyValue);
-            } else {
-                jedis.hmset(this.options.hkey, keyValues);
+                break;
+
+            case HASH:
+                jedis.hmset(description.getAdditionalKey(), keyValues);
+                break;
+
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
             }
+
         } finally {
             if (jedis != null) {
                 jedis.close();

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 027c785..beb4b5f 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,6 +23,7 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
@@ -56,7 +57,9 @@ public class WordCountTridentRedisClusterMap {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);

http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 8ecf9ad..cfda54e 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,7 +23,7 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.trident.state.RedisMapState;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
@@ -48,7 +48,10 @@ public class WordCountTridentRedisMap {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        StateFactory factory = RedisMapState.transactional(poolConfig);
+
+        RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, "test");
+        StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);


[05/12] storm git commit: Apply changes to README.md

Posted by bo...@apache.org.
Apply changes to README.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/52fec9ab
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/52fec9ab
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/52fec9ab

Branch: refs/heads/master
Commit: 52fec9ab2cc3e9464fab158002fd27887eb6ff85
Parents: 2b1f1ae
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 19:15:27 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 19:15:27 2015 +0900

----------------------------------------------------------------------
 external/storm-redis/README.md | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/52fec9ab/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 32480e6..aa7a874 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -184,7 +184,8 @@ RedisState
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
         RedisState.Factory factory = new RedisState.Factory(poolConfig);
 
         TridentTopology topology = new TridentTopology();
@@ -192,8 +193,13 @@ RedisState
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisStateUpdater("test_", tupleMapper, 86400000),
+                                new RedisStateUpdater(storeMapper, 86400000),
                                 new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
 ```
 
 RedisClusterState
@@ -205,7 +211,8 @@ RedisClusterState
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        RedisLookupMapper lookupMapper = new WordCountLookupMapper();
         RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
 
         TridentTopology topology = new TridentTopology();
@@ -213,8 +220,13 @@ RedisClusterState
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
+                                new RedisClusterStateUpdater(storeMapper, 86400000),
                                 new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisClusterStateQuerier(lookupMapper),
+                                new Fields("columnName","columnValue"));
 ```
 
 ## License


[12/12] storm git commit: Added STORM-753 to Changelog

Posted by bo...@apache.org.
Added STORM-753 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0c03235
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0c03235
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0c03235

Branch: refs/heads/master
Commit: a0c032358a63632b958f6f0d1d0bbf9109020e9f
Parents: bed27a7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 2 09:59:16 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 2 09:59:16 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a0c03235/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1d4ebef..ad235d8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
 ## 0.11.0
 
 ## 0.10.0
+ * STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value
  * STORM-835: Netty Client hold batch object until io operation complete
  * STORM-827: Allow AutoTGT to work with storm-hdfs too.
  * STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.


[03/12] storm git commit: Extract abstract classes to reduce code duplication (Redis*MapState)

Posted by bo...@apache.org.
Extract abstract classes to reduce code duplication (Redis*MapState)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9c0329b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9c0329b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9c0329b

Branch: refs/heads/master
Commit: b9c0329bd182333f0af237e661e88958d7157520
Parents: 8fd1f4b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 18:55:03 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 18:55:03 2015 +0900

----------------------------------------------------------------------
 .../trident/state/AbstractRedisMapState.java    |  70 ++++++++
 .../storm/redis/trident/state/KeyFactory.java   |  18 ++
 .../storm/redis/trident/state/Options.java      |  13 ++
 .../trident/state/RedisClusterMapState.java     | 137 +++-----------
 .../redis/trident/state/RedisMapState.java      | 177 +++++--------------
 5 files changed, 173 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
new file mode 100644
index 0000000..a85a57a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -0,0 +1,70 @@
+package org.apache.storm.redis.trident.state;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import storm.trident.state.*;
+import storm.trident.state.map.IBackingMap;
+
+import java.util.*;
+
+public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
+	public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+			StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+			StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+			StateType.OPAQUE, new JSONOpaqueSerializer()
+	));
+
+	@Override public List<T> multiGet(List<List<Object>> keys) {
+		if (keys.size() == 0) {
+			return Collections.emptyList();
+		}
+
+		List<String> stringKeys = buildKeys(keys);
+		List<String> values = retrieveValuesFromRedis(stringKeys);
+
+		return deserializeValues(keys, values);
+	}
+
+	@Override
+	public void multiPut(List<List<Object>> keys, List<T> vals) {
+		if (keys.size() == 0) {
+			return;
+		}
+
+		Map<String, String> keyValues = new HashMap<String, String>();
+		for (int i = 0; i < keys.size(); i++) {
+			String val = new String(getSerializer().serialize(vals.get(i)));
+			String redisKey = getKeyFactory().build(keys.get(i));
+			keyValues.put(redisKey, val);
+		}
+
+		updateStatesToRedis(keyValues);
+	}
+
+	private List<String> buildKeys(List<List<Object>> keys) {
+		List<String> stringKeys = new ArrayList<String>();
+
+		for (List<Object> key : keys) {
+			stringKeys.add(getKeyFactory().build(key));
+		}
+
+		return stringKeys;
+	}
+
+	private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+		List<T> result = new ArrayList<T>(keys.size());
+		for (String value : values) {
+			if (value != null) {
+				result.add((T) getSerializer().deserialize(value.getBytes()));
+			} else {
+				result.add(null);
+			}
+		}
+		return result;
+	}
+
+	protected abstract Serializer getSerializer();
+	protected abstract KeyFactory getKeyFactory();
+	protected abstract List<String> retrieveValuesFromRedis(List<String> keys);
+	protected abstract void updateStatesToRedis(Map<String, String> keyValues);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
new file mode 100644
index 0000000..0fac726
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -0,0 +1,18 @@
+package org.apache.storm.redis.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KeyFactory extends Serializable {
+	String build(List<Object> key);
+
+	class DefaultKeyFactory implements KeyFactory {
+		public String build(List<Object> key) {
+			if (key.size() != 1)
+				throw new RuntimeException("Default KeyFactory does not support compound keys");
+
+			return (String) key.get(0);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
new file mode 100644
index 0000000..59e4ecb
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -0,0 +1,13 @@
+package org.apache.storm.redis.trident.state;
+
+import storm.trident.state.Serializer;
+
+import java.io.Serializable;
+
+public class Options<T> implements Serializable {
+	public int localCacheSize = 1000;
+	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+	KeyFactory keyFactory = null;
+	public Serializer<T> serializer = null;
+	public String hkey = null;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 1154376..ccf5b38 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
 import java.util.List;
 import java.util.Map;
 
-public class RedisClusterMapState<T> implements IBackingMap<T> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
-
-    private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
-            StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
-            StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
-            StateType.OPAQUE, new JSONOpaqueSerializer()
-    ));
-
-    public static class DefaultKeyFactory implements KeyFactory {
-        public String build(List<Object> key) {
-            if (key.size() != 1)
-                throw new RuntimeException("Default KeyFactory does not support compound keys");
-            return (String) key.get(0);
-        }
-    };
-
-    public static class Options<T> implements Serializable {
-        public int localCacheSize = 1000;
-        public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
-        public Serializer<T> serializer = null;
-        public String hkey = null;
-    }
-
-    public static interface KeyFactory extends Serializable {
-        String build(List<Object> key);
-    }
-
+public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
     /**
      * OpaqueTransactional for redis-cluster.
      * */
@@ -150,8 +99,6 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
         return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
     }
 
-
-
     protected static class Factory implements StateFactory {
         public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
 
@@ -169,7 +116,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
 
             this.keyFactory = options.keyFactory;
             if (this.keyFactory == null) {
-                this.keyFactory = new DefaultKeyFactory();
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
             }
             this.serializer = options.serializer;
             if (this.serializer == null) {
@@ -220,74 +167,40 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
         this.keyFactory = keyFactory;
     }
 
-    public List<T> multiGet(List<List<Object>> keys) {
-        if (keys.size() == 0) {
-            return Collections.emptyList();
-        }
+    @Override
+    protected Serializer getSerializer() {
+        return serializer;
+    }
+
+    @Override
+    protected KeyFactory getKeyFactory() {
+        return keyFactory;
+    }
+
+    @Override
+    protected List<String> retrieveValuesFromRedis(List<String> keys) {
+        String[] stringKeys = keys.toArray(new String[keys.size()]);
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            String[] stringKeys = buildKeys(keys);
             List<String> values = Lists.newArrayList();
 
-            for (String stringKey : stringKeys) {
+            for (String stringKey : keys) {
                 String value = jedisCluster.get(stringKey);
                 values.add(value);
             }
 
-            return deserializeValues(keys, values);
+            return values;
         } else {
-            Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
-            List<String> values = buildValuesFromMap(keys, keyValue);
-            return deserializeValues(keys, values);
-        }
-    }
-
-    private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
-        List<String> values = new ArrayList<String>(keys.size());
-        for (List<Object> key : keys) {
-            String strKey = keyFactory.build(key);
-            String value = keyValue.get(strKey);
-            values.add(value);
-        }
-        return values;
-    }
-
-    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
-        List<T> result = new ArrayList<T>(keys.size());
-        for (String value : values) {
-            if (value != null) {
-                result.add((T) serializer.deserialize(value.getBytes()));
-            } else {
-                result.add(null);
-            }
+            return jedisCluster.hmget(this.options.hkey, stringKeys);
         }
-        return result;
-    }
-
-    private String[] buildKeys(List<List<Object>> keys) {
-        String[] stringKeys = new String[keys.size()];
-        int index = 0;
-        for (List<Object> key : keys)
-            stringKeys[index++] = keyFactory.build(key);
-        return stringKeys;
     }
 
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        if (keys.size() == 0) {
-            return;
-        }
-
+    @Override
+    protected void updateStatesToRedis(Map<String, String> keyValues) {
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            for (int i = 0; i < keys.size(); i++) {
-                String val = new String(serializer.serialize(vals.get(i)));
-                String redisKey = keyFactory.build(keys.get(i));
-                jedisCluster.set(redisKey, val);
+            for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+                jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
             }
         } else {
-            Map<String, String> keyValues = new HashMap<String, String>();
-            for (int i = 0; i < keys.size(); i++) {
-                String val = new String(serializer.serialize(vals.get(i)));
-                keyValues.put(keyFactory.build(keys.get(i)), val);
-            }
             jedisCluster.hmset(this.options.hkey, keyValues);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7bc5afb..1461203 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
 import java.util.List;
 import java.util.Map;
 
-public class RedisMapState<T> implements IBackingMap<T> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisMapState.class);
-
-    private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
-            StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
-            StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
-            StateType.OPAQUE, new JSONOpaqueSerializer()
-    ));
-
-    public static class DefaultKeyFactory implements KeyFactory {
-        public String build(List<Object> key) {
-            if (key.size() != 1)
-                throw new RuntimeException("Default KeyFactory does not support compound keys");
-            return (String) key.get(0);
-        }
-    };
-
-    public static class Options<T> implements Serializable {
-        public int localCacheSize = 1000;
-        public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
-        public Serializer<T> serializer = null;
-        public String hkey = null;
-    }
-
-    public static interface KeyFactory extends Serializable {
-        String build(List<Object> key);
-    }
-
+public class RedisMapState<T> extends AbstractRedisMapState<T> {
     /**
      * OpaqueTransactional for redis.
      * */
@@ -167,7 +116,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
 
             this.keyFactory = options.keyFactory;
             if (this.keyFactory == null) {
-                this.keyFactory = new DefaultKeyFactory();
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
             }
             this.serializer = options.serializer;
             if (this.serializer == null) {
@@ -219,96 +168,64 @@ public class RedisMapState<T> implements IBackingMap<T> {
         this.keyFactory = keyFactory;
     }
 
-    public List<T> multiGet(List<List<Object>> keys) {
-        if (keys.size() == 0) {
-            return Collections.emptyList();
-        }
+    @Override
+    protected Serializer getSerializer() {
+        return serializer;
+    }
 
-        String[] stringKeys = buildKeys(keys);
-
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                List<String> values = jedis.mget(stringKeys);
-                return deserializeValues(keys, values);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        } else {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                List<String> values = jedis.hmget(this.options.hkey, stringKeys);
-                return deserializeValues(keys, values);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        }
+    @Override
+    protected KeyFactory getKeyFactory() {
+        return keyFactory;
     }
 
-    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
-        List<T> result = new ArrayList<T>(keys.size());
-        for (String value : values) {
-            if (value != null) {
-                result.add((T) serializer.deserialize(value.getBytes()));
+    @Override
+    protected List<String> retrieveValuesFromRedis(List<String> keys) {
+        String[] stringKeys = keys.toArray(new String[keys.size()]);
+        Jedis jedis = null;
+        try {
+            jedis = jedisPool.getResource();
+
+            if (Strings.isNullOrEmpty(this.options.hkey)) {
+                return jedis.mget(stringKeys);
             } else {
-                result.add(null);
+                return jedis.hmget(this.options.hkey, stringKeys);
+            }
+        } finally {
+            if (jedis != null) {
+                jedis.close();
             }
         }
-        return result;
     }
 
-    private String[] buildKeys(List<List<Object>> keys) {
-        String[] stringKeys = new String[keys.size()];
-        int index = 0;
-        for (List<Object> key : keys)
-            stringKeys[index++] = keyFactory.build(key);
-        return stringKeys;
-    }
+    @Override
+    protected void updateStatesToRedis(Map<String, String> keyValues) {
+        Jedis jedis = null;
 
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        if (keys.size() == 0) {
-            return;
-        }
+        try {
+            jedis = jedisPool.getResource();
 
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                String[] keyValue = buildKeyValuesList(keys, vals);
+            if (Strings.isNullOrEmpty(this.options.hkey)) {
+                String[] keyValue = buildKeyValuesList(keyValues);
                 jedis.mset(keyValue);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        } else {
-            Jedis jedis = jedisPool.getResource();
-            try {
-                Map<String, String> keyValues = new HashMap<String, String>();
-                for (int i = 0; i < keys.size(); i++) {
-                    String val = new String(serializer.serialize(vals.get(i)));
-                    keyValues.put(keyFactory.build(keys.get(i)), val);
-                }
+            } else {
                 jedis.hmset(this.options.hkey, keyValues);
-
-            } finally {
-                jedisPool.returnResource(jedis);
+            }
+        } finally {
+            if (jedis != null) {
+                jedis.close();
             }
         }
     }
 
-    private String[] buildKeyValuesList(List<List<Object>> keys, List<T> vals) {
-        String[] keyValues = new String[keys.size() * 2];
-        for (int i = 0; i < keys.size(); i++) {
-            keyValues[i * 2] = keyFactory.build(keys.get(i));
-            keyValues[i * 2 + 1] = new String(serializer.serialize(vals.get(i)));
+    private String[] buildKeyValuesList(Map<String, String> keyValues) {
+        String[] keyValueLists = new String[keyValues.size() * 2];
+
+        int idx = 0;
+        for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+            keyValueLists[idx++] = kvEntry.getKey();
+            keyValueLists[idx++] = kvEntry.getValue();
         }
-        return keyValues;
+
+        return keyValueLists;
     }
 }


[04/12] storm git commit: Add missing Apache comments to new files

Posted by bo...@apache.org.
Add missing Apache comments to new files


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b1f1ae5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b1f1ae5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b1f1ae5

Branch: refs/heads/master
Commit: 2b1f1ae56b3b3eb4ada7563db319066cb78ce0dc
Parents: b9c0329
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 19:02:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 19:02:19 2015 +0900

----------------------------------------------------------------------
 .../redis/trident/state/AbstractRedisMapState.java | 17 +++++++++++++++++
 .../storm/redis/trident/state/KeyFactory.java      | 17 +++++++++++++++++
 .../apache/storm/redis/trident/state/Options.java  | 17 +++++++++++++++++
 3 files changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
index a85a57a..56b37b2 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.redis.trident.state;
 
 import com.google.common.collect.ImmutableMap;

http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
index 0fac726..7acea10 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.redis.trident.state;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index 59e4ecb..bef7b6c 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.storm.redis.trident.state;
 
 import storm.trident.state.Serializer;


[02/12] storm git commit: Redis*StateQuerier / Redis*StateUpdater now support HASH type

Posted by bo...@apache.org.
Redis*StateQuerier / Redis*StateUpdater now support HASH type

* use Pipeline when available to gain performance
* extract abstract classes to reduce code duplication
 ** AbstractRedisStateQuerier, AbstractRedisStateUpdater


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8fd1f4b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8fd1f4b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8fd1f4b1

Branch: refs/heads/master
Commit: 8fd1f4b193b5d81b3036726268da39380e2b3b61
Parents: 7fec9a1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 12:21:57 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 12:21:57 2015 +0900

----------------------------------------------------------------------
 .../state/AbstractRedisStateQuerier.java        | 69 ++++++++++++++++++++
 .../state/AbstractRedisStateUpdater.java        | 67 +++++++++++++++++++
 .../trident/state/RedisClusterStateQuerier.java | 53 ++++++---------
 .../trident/state/RedisClusterStateUpdater.java | 66 ++++++++-----------
 .../redis/trident/state/RedisStateQuerier.java  | 54 +++++----------
 .../redis/trident/state/RedisStateUpdater.java  | 67 +++++++++----------
 .../redis/trident/WordCountLookupMapper.java    |  2 +-
 .../redis/trident/WordCountStoreMapper.java     |  2 +-
 8 files changed, 231 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
new file mode 100644
index 0000000..24ecfc4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQueryFunction<T, List<Values>> {
+	private final RedisLookupMapper lookupMapper;
+	protected final RedisDataTypeDescription.RedisDataType dataType;
+	protected final String additionalKey;
+
+	public AbstractRedisStateQuerier(RedisLookupMapper lookupMapper) {
+		this.lookupMapper = lookupMapper;
+
+		RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+		this.dataType = dataTypeDescription.getDataType();
+		this.additionalKey = dataTypeDescription.getAdditionalKey();
+	}
+
+	@Override
+	public List<List<Values>> batchRetrieve(T state, List<TridentTuple> inputs) {
+		List<List<Values>> values = Lists.newArrayList();
+
+		List<String> keys = Lists.newArrayList();
+		for (TridentTuple input : inputs) {
+			keys.add(lookupMapper.getKeyFromTuple(input));
+		}
+
+		List<String> redisVals = retrieveValuesFromRedis(state, keys);
+		for (int i = 0 ; i < redisVals.size() ; i++) {
+			values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+		}
+
+		return values;
+	}
+
+	@Override
+	public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+		for (Values value : values) {
+			collector.emit(value);
+		}
+	}
+
+	protected abstract List<String> retrieveValuesFromRedis(T redisClusterState, List<String> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
new file mode 100644
index 0000000..2f95341
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
+	private final RedisStoreMapper storeMapper;
+
+	protected final int expireIntervalSec;
+	protected final RedisDataTypeDescription.RedisDataType dataType;
+	protected final String additionalKey;
+
+	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+		this.storeMapper = storeMapper;
+		RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+		this.dataType = dataTypeDescription.getDataType();
+		this.additionalKey = dataTypeDescription.getAdditionalKey();
+
+		if (expireIntervalSec > 0) {
+			this.expireIntervalSec = expireIntervalSec;
+		} else {
+			this.expireIntervalSec = 0;
+		}
+	}
+
+	@Override
+	public void updateState(T state, List<TridentTuple> inputs,
+			TridentCollector collector) {
+		Map<String, String> keyToValue = new HashMap<String, String>();
+
+		for (TridentTuple input : inputs) {
+			String key = storeMapper.getKeyFromTuple(input);
+			String value = storeMapper.getValueFromTuple(input);
+
+			keyToValue.put(key, value);
+		}
+
+		updateStatesToRedis(state, keyToValue);
+	}
+
+	protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 4382fe3..66ff3f6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -17,57 +17,42 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
-    private final RedisLookupMapper lookupMapper;
-
+public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClusterState> {
     public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
-        this.lookupMapper = lookupMapper;
+        super(lookupMapper);
     }
 
     @Override
-    public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
-        List<List<Values>> ret = Lists.newArrayList();
-
+    protected List<String> retrieveValuesFromRedis(RedisClusterState redisClusterState, List<String> keys) {
         JedisCluster jedisCluster = null;
         try {
             jedisCluster = redisClusterState.getJedisCluster();
-
-            for (int i = 0 ; i < inputs.size() ; i++) {
-                TridentTuple input = inputs.get(i);
-
-                String key = lookupMapper.getKeyFromTuple(input);
-                String value = jedisCluster.get(key);
-                ret.add(lookupMapper.toTuple(input, value));
-                logger.debug("redis get key[" + key + "] value [" + value + "]");
+            List<String> redisVals = new ArrayList<String>();
+
+            for (String key : keys) {
+                switch (dataType) {
+                case STRING:
+                    redisVals.add(jedisCluster.get(key));
+                    break;
+                case HASH:
+                    redisVals.add(jedisCluster.hget(additionalKey, key));
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+                }
             }
+
+            return redisVals;
         } finally {
             if (jedisCluster != null) {
                 redisClusterState.returnJedisCluster(jedisCluster);
             }
         }
-
-        return ret;
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
-        for (Values value : values) {
-            collector.emit(value);
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 35fb48e..924b6b9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -19,63 +19,51 @@ package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
-public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
-    private final RedisStoreMapper storeMapper;
-    private final int expireIntervalSec;
+import java.util.Map;
 
+public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
     public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-        this.storeMapper = storeMapper;
-        assertDataType(storeMapper.getDataTypeDescription());
-
-        if (expireIntervalSec > 0) {
-            this.expireIntervalSec = expireIntervalSec;
-        } else {
-            this.expireIntervalSec = 0;
-        }
+        super(storeMapper, expireIntervalSec);
     }
 
     @Override
-    public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
-                            TridentCollector collector) {
-
+    protected void updateStatesToRedis(RedisClusterState redisClusterState, Map<String, String> keyToValue) {
         JedisCluster jedisCluster = null;
         try {
             jedisCluster = redisClusterState.getJedisCluster();
-            for (TridentTuple input : inputs) {
-                String key = storeMapper.getKeyFromTuple(input);
-                String value = storeMapper.getValueFromTuple(input);
 
-                logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
+            for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+                String key = kvEntry.getKey();
+                String value = kvEntry.getValue();
 
-                if (this.expireIntervalSec > 0) {
-                    jedisCluster.setex(key, expireIntervalSec, value);
-                } else {
-                    jedisCluster.set(key, value);
+                switch (dataType) {
+                case STRING:
+                    if (this.expireIntervalSec > 0) {
+                        jedisCluster.setex(key, expireIntervalSec, value);
+                    } else {
+                        jedisCluster.set(key, value);
+                    }
+                    break;
+                case HASH:
+                    jedisCluster.hset(additionalKey, key, value);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
                 }
             }
+
+            // send expire command for hash only once
+            // it expires key itself entirely, so use it with caution
+            if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+                    this.expireIntervalSec > 0) {
+                jedisCluster.expire(additionalKey, expireIntervalSec);
+            }
         } finally {
             if (jedisCluster != null) {
                 redisClusterState.returnJedisCluster(jedisCluster);
             }
         }
     }
-
-    private void assertDataType(RedisDataTypeDescription storeMapper) {
-        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index a215741..ac102dd 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -17,62 +17,40 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
 
-import java.util.ArrayList;
 import java.util.List;
 
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
-    private final RedisLookupMapper lookupMapper;
-
+public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> {
     public RedisStateQuerier(RedisLookupMapper lookupMapper) {
-        this.lookupMapper = lookupMapper;
-        assertDataType(lookupMapper.getDataTypeDescription());
+        super(lookupMapper);
     }
 
     @Override
-    public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
-        List<List<Values>> values = new ArrayList<List<Values>>();
-
-        List<String> keys = Lists.newArrayList();
-        for (TridentTuple input : inputs) {
-            keys.add(lookupMapper.getKeyFromTuple(input));
-        }
-
+    protected List<String> retrieveValuesFromRedis(RedisState redisState, List<String> keys) {
         Jedis jedis = null;
         try {
             jedis = redisState.getJedis();
-            List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
-
-            for (int i = 0 ; i < redisVals.size() ; i++) {
-                values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+            List<String> redisVals;
+
+            String[] keysForRedis = keys.toArray(new String[keys.size()]);
+            switch (dataType) {
+            case STRING:
+                redisVals = jedis.mget(keysForRedis);
+                break;
+            case HASH:
+                redisVals = jedis.hmget(additionalKey, keysForRedis);
+                break;
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + dataType);
             }
 
-            return values;
+            return redisVals;
         } finally {
             if (jedis != null) {
                 redisState.returnJedis(jedis);
             }
         }
     }
-
-    @Override
-    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
-        for (Values value : values) {
-            collector.emit(value);
-        }
-    }
-
-    private void assertDataType(RedisDataTypeDescription lookupMapper) {
-        if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 384a120..583fa32 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -19,51 +19,51 @@ package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import redis.clients.jedis.Pipeline;
 
-import java.util.List;
-
-public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
-
-    private final RedisStoreMapper storeMapper;
-    private final int expireIntervalSec;
+import java.util.Map;
 
+public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
     public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-        this.storeMapper = storeMapper;
-        assertDataType(storeMapper.getDataTypeDescription());
-
-        if (expireIntervalSec > 0) {
-            this.expireIntervalSec = expireIntervalSec;
-        } else {
-            this.expireIntervalSec = 0;
-        }
+        super(storeMapper, expireIntervalSec);
     }
 
     @Override
-    public void updateState(RedisState redisState, List<TridentTuple> inputs,
-                            TridentCollector collector) {
+    protected void updateStatesToRedis(RedisState redisState, Map<String, String> keyToValue) {
         Jedis jedis = null;
         try {
             jedis = redisState.getJedis();
-            for (TridentTuple input : inputs) {
-                String key = storeMapper.getKeyFromTuple(input);
-                String value = storeMapper.getValueFromTuple(input);
+            Pipeline pipeline = jedis.pipelined();
 
-                logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
+            for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+                String key = kvEntry.getKey();
+                String value = kvEntry.getValue();
 
-                if (this.expireIntervalSec > 0) {
-                    jedis.setex(key, expireIntervalSec, value);
-                } else {
-                    jedis.set(key, value);
+                switch (dataType) {
+                case STRING:
+                    if (this.expireIntervalSec > 0) {
+                        pipeline.setex(key, expireIntervalSec, value);
+                    } else {
+                        pipeline.set(key, value);
+                    }
+                    break;
+                case HASH:
+                    pipeline.hset(additionalKey, key, value);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
                 }
             }
+
+            // send expire command for hash only once
+            // it expires key itself entirely, so use it with caution
+            if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+                    this.expireIntervalSec > 0) {
+                pipeline.expire(additionalKey, expireIntervalSec);
+            }
+
+            pipeline.sync();
         } finally {
             if (jedis != null) {
                 redisState.returnJedis(jedis);
@@ -71,9 +71,4 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
         }
     }
 
-    private void assertDataType(RedisDataTypeDescription storeMapper) {
-        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
index 891a1af..5c67c8c 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -25,7 +25,7 @@ public class WordCountLookupMapper implements RedisLookupMapper {
 
     @Override
     public RedisDataTypeDescription getDataTypeDescription() {
-        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
index aa03ead..6521302 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -7,7 +7,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 public class WordCountStoreMapper implements RedisStoreMapper {
     @Override
     public RedisDataTypeDescription getDataTypeDescription() {
-        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
     }
 
     @Override


[11/12] storm git commit: Merge branch 'STORM-753' of https://github.com/HeartSaVioR/storm into STORM-753

Posted by bo...@apache.org.
Merge branch 'STORM-753' of https://github.com/HeartSaVioR/storm into STORM-753

STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bed27a74
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bed27a74
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bed27a74

Branch: refs/heads/master
Commit: bed27a744ab7acd061a0ab71bcb8e78bc0e0151e
Parents: ad98824 1eb85a2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 2 09:52:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 2 09:52:11 2015 -0500

----------------------------------------------------------------------
 external/storm-redis/README.md                  |  20 +-
 .../trident/state/AbstractRedisMapState.java    |  96 +++++++++
 .../state/AbstractRedisStateQuerier.java        |  69 +++++++
 .../state/AbstractRedisStateUpdater.java        |  69 +++++++
 .../storm/redis/trident/state/KeyFactory.java   |  35 ++++
 .../storm/redis/trident/state/Options.java      |  33 ++++
 .../trident/state/RedisClusterMapState.java     | 153 +++++----------
 .../trident/state/RedisClusterStateQuerier.java |  60 ++----
 .../trident/state/RedisClusterStateUpdater.java |  69 +++----
 .../redis/trident/state/RedisMapState.java      | 194 +++++++------------
 .../redis/trident/state/RedisStateQuerier.java  |  54 ++----
 .../redis/trident/state/RedisStateUpdater.java  |  73 ++++---
 .../redis/trident/WordCountLookupMapper.java    |  40 ++++
 .../redis/trident/WordCountStoreMapper.java     |  22 +++
 .../redis/trident/WordCountTridentRedis.java    |  12 +-
 .../trident/WordCountTridentRedisCluster.java   |  11 +-
 .../WordCountTridentRedisClusterMap.java        |   6 +-
 .../redis/trident/WordCountTridentRedisMap.java |   8 +-
 .../redis/trident/WordCountTupleMapper.java     |  16 --
 19 files changed, 623 insertions(+), 417 deletions(-)
----------------------------------------------------------------------



[07/12] storm git commit: Merge branch 'master' into STORM-753

Posted by bo...@apache.org.
Merge branch 'master' into STORM-753

Conflicts:
	external/storm-redis/README.md
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
	external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
	external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/037a9754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/037a9754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/037a9754

Branch: refs/heads/master
Commit: 037a9754fdee063d2238a9c41e6ffdb79759d33d
Parents: 98704ec 0c2b3a4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat May 30 22:34:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat May 30 22:34:19 2015 +0900

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .travis.yml                                     |   9 +
 CHANGELOG.md                                    |  48 +-
 LICENSE                                         |  29 +
 README.markdown                                 |   7 +-
 SECURITY.md                                     |  50 ++
 STORM-UI-REST-API.md                            |  44 +-
 bin/storm                                       |   2 +-
 bin/storm.cmd                                   |   4 +-
 bin/storm.py                                    |  24 +-
 conf/defaults.yaml                              |   3 +-
 dev-tools/test-ns.py                            |  19 +-
 .../print-errors-from-clojure-test-reports.py   |  58 ++
 dev-tools/travis/travis-build.sh                |  50 ++
 docs/documentation/Metrics.md                   |   2 +-
 docs/documentation/Multilang-protocol.md        |  63 +-
 .../documentation/Setting-up-a-Storm-cluster.md |   7 +-
 docs/documentation/Trident-API-Overview.md      |   2 +-
 examples/storm-starter/README.markdown          |   8 +-
 .../storm-starter/multilang/resources/storm.js  | 373 ---------
 .../storm-starter/multilang/resources/storm.py  | 260 -------
 .../storm-starter/multilang/resources/storm.rb  | 236 ------
 examples/storm-starter/pom.xml                  |  67 +-
 external/storm-eventhubs/README.md              |  41 +
 external/storm-eventhubs/pom.xml                | 122 +++
 .../storm/eventhubs/bolt/EventHubBolt.java      |  81 ++
 .../client/ConnectionStringBuilder.java         | 116 +++
 .../storm/eventhubs/client/Constants.java       |  32 +
 .../storm/eventhubs/client/EventHubClient.java  |  92 +++
 .../eventhubs/client/EventHubConsumerGroup.java |  72 ++
 .../eventhubs/client/EventHubException.java     |  37 +
 .../eventhubs/client/EventHubReceiver.java      | 139 ++++
 .../eventhubs/client/EventHubSendClient.java    |  70 ++
 .../storm/eventhubs/client/EventHubSender.java  |  95 +++
 .../storm/eventhubs/client/SelectorFilter.java  |  38 +
 .../eventhubs/client/SelectorFilterWriter.java  |  64 ++
 .../eventhubs/samples/AtMostOnceEventCount.java |  54 ++
 .../storm/eventhubs/samples/EventCount.java     | 155 ++++
 .../storm/eventhubs/samples/EventHubLoop.java   |  51 ++
 .../samples/OpaqueTridentEventCount.java        |  53 ++
 .../samples/TransactionalTridentEventCount.java |  81 ++
 .../eventhubs/samples/bolt/GlobalCountBolt.java |  83 ++
 .../samples/bolt/PartialCountBolt.java          |  63 ++
 .../apache/storm/eventhubs/spout/EventData.java |  48 ++
 .../storm/eventhubs/spout/EventDataScheme.java  |  55 ++
 .../eventhubs/spout/EventHubReceiverFilter.java |  56 ++
 .../eventhubs/spout/EventHubReceiverImpl.java   | 150 ++++
 .../storm/eventhubs/spout/EventHubSpout.java    | 258 +++++++
 .../eventhubs/spout/EventHubSpoutConfig.java    | 165 ++++
 .../eventhubs/spout/EventHubSpoutException.java |  37 +
 .../storm/eventhubs/spout/FieldConstants.java   |  25 +
 .../storm/eventhubs/spout/IEventDataScheme.java |  30 +
 .../eventhubs/spout/IEventHubReceiver.java      |  35 +
 .../spout/IEventHubReceiverFactory.java         |  30 +
 .../spout/IEventHubReceiverFilter.java          |  35 +
 .../eventhubs/spout/IPartitionCoordinator.java  |  27 +
 .../eventhubs/spout/IPartitionManager.java      |  37 +
 .../spout/IPartitionManagerFactory.java         |  33 +
 .../storm/eventhubs/spout/IStateStore.java      |  31 +
 .../apache/storm/eventhubs/spout/MessageId.java |  56 ++
 .../storm/eventhubs/spout/PartitionManager.java | 101 +++
 .../eventhubs/spout/SimplePartitionManager.java | 136 ++++
 .../spout/StaticPartitionCoordinator.java       |  85 +++
 .../eventhubs/spout/ZookeeperStateStore.java    |  95 +++
 .../storm/eventhubs/trident/Coordinator.java    |  60 ++
 .../trident/ITridentPartitionManager.java       |  35 +
 .../ITridentPartitionManagerFactory.java        |  26 +
 .../trident/OpaqueTridentEventHubEmitter.java   |  69 ++
 .../trident/OpaqueTridentEventHubSpout.java     |  64 ++
 .../storm/eventhubs/trident/Partition.java      |  39 +
 .../storm/eventhubs/trident/Partitions.java     |  41 +
 .../TransactionalTridentEventHubEmitter.java    | 167 ++++
 .../TransactionalTridentEventHubSpout.java      |  66 ++
 .../trident/TridentPartitionManager.java        |  91 +++
 .../src/main/resources/config.properties        |  27 +
 .../eventhubs/spout/EventHubReceiverMock.java   | 105 +++
 .../spout/EventHubSpoutCallerMock.java          |  96 +++
 .../spout/PartitionManagerCallerMock.java       | 105 +++
 .../spout/SpoutOutputCollectorMock.java         |  61 ++
 .../storm/eventhubs/spout/StateStoreMock.java   |  54 ++
 .../storm/eventhubs/spout/TestEventData.java    |  47 ++
 .../eventhubs/spout/TestEventHubSpout.java      |  70 ++
 .../eventhubs/spout/TestPartitionManager.java   | 117 +++
 .../TestTransactionalTridentEmitter.java        |  93 +++
 .../eventhubs/trident/TridentCollectorMock.java |  52 ++
 .../mapper/SimpleTridentHBaseMapMapper.java     |  50 ++
 .../trident/mapper/TridentHBaseMapMapper.java   |  40 +
 .../hbase/trident/state/HBaseMapState.java      |  45 +-
 .../hdfs/common/security/HdfsSecurityUtil.java  |   5 +-
 external/storm-hive/pom.xml                     |  21 +
 external/storm-jdbc/README.md                   |  72 +-
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  17 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   5 +-
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |   5 +-
 .../org/apache/storm/jdbc/common/Column.java    |   7 +-
 .../storm/jdbc/common/ConnectionProvider.java   |  26 +
 .../jdbc/common/HikariCPConnectionProvider.java |  46 ++
 .../apache/storm/jdbc/common/JdbcClient.java    |  19 +-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |   6 +-
 .../storm/jdbc/trident/state/JdbcState.java     |  13 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   5 +-
 .../jdbc/topology/AbstractUserTopology.java     |  17 +-
 .../jdbc/topology/UserPersistanceTopology.java  |  18 +-
 .../UserPersistanceTridentTopology.java         |   2 +-
 external/storm-kafka/README.md                  |  52 +-
 .../jvm/storm/kafka/DynamicBrokersReader.java   |  26 +
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |   5 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   4 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |   1 +
 .../kafka/trident/TridentKafkaEmitter.java      |   4 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  13 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   6 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  27 +
 external/storm-redis/README.md                  |   4 +-
 external/storm-redis/pom.xml                    |   2 +-
 .../redis/common/container/JedisContainer.java  |   7 +-
 .../state/AbstractRedisStateUpdater.java        |   6 +-
 .../trident/state/RedisClusterStateUpdater.java |   9 +-
 .../redis/trident/state/RedisStateUpdater.java  |   9 +-
 .../redis/trident/WordCountTridentRedis.java    |   2 +-
 .../trident/WordCountTridentRedisCluster.java   |   2 +-
 pom.xml                                         |   8 +-
 storm-core/pom.xml                              |  40 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   5 +-
 storm-core/src/clj/backtype/storm/converter.clj |  10 +-
 .../src/clj/backtype/storm/daemon/common.clj    |  15 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |  21 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   9 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   1 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   9 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  58 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  35 +-
 .../src/clj/backtype/storm/local_state.clj      |  99 +++
 .../src/clj/backtype/storm/messaging/loader.clj |  13 +-
 storm-core/src/clj/backtype/storm/testing.clj   |  25 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  42 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |  51 +-
 storm-core/src/clj/backtype/storm/util.clj      |  37 +-
 storm-core/src/dev/resources/storm.js           | 373 ---------
 storm-core/src/dev/resources/storm.py           | 260 -------
 storm-core/src/dev/resources/storm.rb           | 236 ------
 storm-core/src/jvm/backtype/storm/Config.java   | 104 ++-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |   8 +-
 .../storm/generated/ClusterWorkerHeartbeat.java | 102 ++-
 .../storm/generated/LSApprovedWorkers.java      | 458 +++++++++++
 .../generated/LSSupervisorAssignments.java      | 471 ++++++++++++
 .../storm/generated/LSSupervisorId.java         | 406 ++++++++++
 .../storm/generated/LSWorkerHeartbeat.java      | 755 +++++++++++++++++++
 .../storm/generated/LocalAssignment.java        | 561 ++++++++++++++
 .../storm/generated/LocalStateData.java         | 471 ++++++++++++
 .../jvm/backtype/storm/generated/Nimbus.java    |  12 +-
 .../storm/generated/SupervisorInfo.java         | 116 ++-
 .../storm/generated/SupervisorSummary.java      | 117 ++-
 .../storm/generated/ThriftSerializedObject.java | 516 +++++++++++++
 .../storm/generated/TopologySummary.java        |   2 +-
 .../backtype/storm/messaging/netty/Client.java  |   4 +-
 .../security/auth/SaslTransportPlugin.java      |  17 +-
 .../GzipBridgeThriftSerializationDelegate.java  |  64 ++
 .../GzipThriftSerializationDelegate.java        |  57 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |   6 +
 .../src/jvm/backtype/storm/task/IBolt.java      |   4 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   5 +-
 .../backtype/storm/task/TopologyContext.java    | 104 ++-
 .../storm/utils/ExtendedThreadPoolExecutor.java |  67 ++
 .../jvm/backtype/storm/utils/LocalState.java    | 163 +++-
 .../backtype/storm/utils/TransferDrainer.java   |  62 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  58 +-
 storm-core/src/multilang/js/storm.js            | 366 ---------
 storm-core/src/multilang/py/storm.py            | 260 -------
 storm-core/src/multilang/rb/storm.rb            | 236 ------
 storm-core/src/py/__init__.py                   |   2 +
 storm-core/src/py/storm/DistributedRPC.py       |   2 +
 .../src/py/storm/DistributedRPCInvocations.py   |   2 +
 storm-core/src/py/storm/Nimbus.py               |  10 +
 storm-core/src/py/storm/__init__.py             |   2 +
 storm-core/src/py/storm/constants.py            |   2 +
 storm-core/src/py/storm/ttypes.py               | 645 +++++++++++++++-
 storm-core/src/storm.thrift                     |  43 ++
 .../src/ui/public/css/jsonFormatter.min.css     |   1 +
 storm-core/src/ui/public/css/style.css          |  15 +-
 storm-core/src/ui/public/index.html             |   3 +
 .../src/ui/public/js/jsonFormatter.min.js       |   2 +
 storm-core/src/ui/public/js/script.js           |   5 +-
 .../public/templates/anti-forgery-template.html |  19 -
 .../templates/component-page-template.html      |   8 +-
 .../public/templates/index-page-template.html   |   6 +
 .../templates/topology-page-template.html       |   4 +-
 storm-core/src/ui/public/topology.html          |  11 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   4 +-
 .../clj/backtype/storm/local_state_test.clj     |  40 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   2 +-
 .../clj/backtype/storm/transactional_test.clj   |   6 +-
 ...ipBridgeThriftSerializationDelegateTest.java |  71 ++
 .../storm/utils/DisruptorQueueTest.java         |  38 +-
 storm-core/test/resources/logback-test.xml      |  26 +
 storm-dist/binary/src/main/assembly/binary.xml  |  41 +-
 storm-multilang/javascript/pom.xml              |  32 +
 .../src/main/resources/resources/storm.js       | 373 +++++++++
 storm-multilang/python/pom.xml                  |  32 +
 .../src/main/resources/resources/storm.py       | 260 +++++++
 storm-multilang/ruby/pom.xml                    |  32 +
 .../ruby/src/main/resources/resources/storm.rb  | 236 ++++++
 203 files changed, 12236 insertions(+), 3173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/README.md
----------------------------------------------------------------------
diff --cc external/storm-redis/README.md
index aa7a874,f165c09..86cd937
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@@ -193,13 -192,8 +193,13 @@@ RedisStat
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisStateUpdater(storeMapper, 86400000),
+                                 new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
                                  new Fields());
 +
 +        TridentState state = topology.newStaticState(factory);
 +        stream = stream.stateQuery(state, new Fields("word"),
 +                                new RedisStateQuerier(lookupMapper),
 +                                new Fields("columnName","columnValue"));
  ```
  
  RedisClusterState
@@@ -220,13 -213,8 +220,13 @@@
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisClusterStateUpdater(storeMapper, 86400000),
+                                 new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
                                  new Fields());
 +
 +        TridentState state = topology.newStaticState(factory);
 +        stream = stream.stateQuery(state, new Fields("word"),
 +                                new RedisClusterStateQuerier(lookupMapper),
 +                                new Fields("columnName","columnValue"));
  ```
  
  ## License

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
index 2f95341,0000000..87bb8fa
mode 100644,000000..100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@@ -1,67 -1,0 +1,69 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.redis.trident.state;
 +
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 +import storm.trident.operation.TridentCollector;
 +import storm.trident.state.BaseStateUpdater;
 +import storm.trident.state.State;
 +import storm.trident.tuple.TridentTuple;
 +
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
 +	private final RedisStoreMapper storeMapper;
 +
- 	protected final int expireIntervalSec;
++	protected int expireIntervalSec = 0;
 +	protected final RedisDataTypeDescription.RedisDataType dataType;
 +	protected final String additionalKey;
 +
- 	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
++	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper) {
 +		this.storeMapper = storeMapper;
 +		RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
 +		this.dataType = dataTypeDescription.getDataType();
 +		this.additionalKey = dataTypeDescription.getAdditionalKey();
++	}
 +
++	public void setExpireInterval(int expireIntervalSec) {
 +		if (expireIntervalSec > 0) {
 +			this.expireIntervalSec = expireIntervalSec;
 +		} else {
 +			this.expireIntervalSec = 0;
 +		}
 +	}
 +
 +	@Override
 +	public void updateState(T state, List<TridentTuple> inputs,
 +			TridentCollector collector) {
 +		Map<String, String> keyToValue = new HashMap<String, String>();
 +
 +		for (TridentTuple input : inputs) {
 +			String key = storeMapper.getKeyFromTuple(input);
 +			String value = storeMapper.getValueFromTuple(input);
 +
 +			keyToValue.put(key, value);
 +		}
 +
 +		updateStatesToRedis(state, keyToValue);
 +	}
 +
 +	protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 924b6b9,e00cfb6..17c5bfc
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@@ -17,15 -17,36 +17,20 @@@
   */
  package org.apache.storm.redis.trident.state;
  
 -import org.apache.storm.redis.common.mapper.TupleMapper;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
  import redis.clients.jedis.JedisCluster;
 -import storm.trident.operation.TridentCollector;
 -import storm.trident.state.BaseStateUpdater;
 -import storm.trident.tuple.TridentTuple;
  
 -import java.util.List;
 +import java.util.Map;
  
 -public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
 -    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 -
 -    private final String redisKeyPrefix;
 -    private final TupleMapper tupleMapper;
 -    private int expireIntervalSec = 0;
 -
 -    public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
 -        this.redisKeyPrefix = redisKeyPrefix;
 -        this.tupleMapper = tupleMapper;
 +public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
-     public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-         super(storeMapper, expireIntervalSec);
++    public RedisClusterStateUpdater(RedisStoreMapper storeMapper) {
++        super(storeMapper);
+     }
+ 
+     public RedisClusterStateUpdater withExpire(int expireIntervalSec) {
 -        if (expireIntervalSec > 0) {
 -            this.expireIntervalSec = expireIntervalSec;
 -        } else {
 -            this.expireIntervalSec = 0;
 -        }
 -
++        setExpireInterval(expireIntervalSec);
+         return this;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 583fa32,2939d3d..babcb1d
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@@ -17,16 -17,36 +17,21 @@@
   */
  package org.apache.storm.redis.trident.state;
  
 -import org.apache.storm.redis.common.mapper.TupleMapper;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
  import redis.clients.jedis.Jedis;
 -import storm.trident.operation.TridentCollector;
 -import storm.trident.state.BaseStateUpdater;
 -import storm.trident.tuple.TridentTuple;
 +import redis.clients.jedis.Pipeline;
  
 -import java.util.List;
 +import java.util.Map;
  
 -public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
 -    private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 -
 -    private final String redisKeyPrefix;
 -    private final TupleMapper tupleMapper;
 -    private int expireIntervalSec = 0;
 -
 -    public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
 -        this.redisKeyPrefix = redisKeyPrefix;
 -        this.tupleMapper = tupleMapper;
 +public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
-     public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-         super(storeMapper, expireIntervalSec);
++    public RedisStateUpdater(RedisStoreMapper storeMapper) {
++        super(storeMapper);
+     }
+ 
+     public RedisStateUpdater withExpire(int expireIntervalSec) {
 -        if (expireIntervalSec > 0) {
 -            this.expireIntervalSec = expireIntervalSec;
 -        } else {
 -            this.expireIntervalSec = 0;
 -        }
 -
++        setExpireInterval(expireIntervalSec);
+         return this;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 79bab5a,eb13399..4a4aae0
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@@ -58,7 -55,7 +58,7 @@@ public class WordCountTridentRedis 
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisStateUpdater(storeMapper, 86400000),
 -                                new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
++                                new RedisStateUpdater(storeMapper).withExpire(86400000),
                                  new Fields());
  
          TridentState state = topology.newStaticState(factory);

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 280b273,8562e77..765b339
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@@ -66,7 -63,7 +66,7 @@@ public class WordCountTridentRedisClust
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisClusterStateUpdater(storeMapper, 86400000),
 -                                new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
++                                new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
                                  new Fields());
  
          TridentState state = topology.newStaticState(factory);


[09/12] storm git commit: STORM-753 Expose KeyFactory to public since it doesn't have setter

Posted by bo...@apache.org.
STORM-753 Expose KeyFactory to public since it doesn't have setter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84fc7645
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84fc7645
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84fc7645

Branch: refs/heads/master
Commit: 84fc764560b50a133c5c902af9f7be9cd0e78a4d
Parents: f8906fb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:35:02 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:35:02 2015 +0900

----------------------------------------------------------------------
 .../main/java/org/apache/storm/redis/trident/state/Options.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/84fc7645/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index b262c42..dffb713 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -27,7 +27,7 @@ public class Options<T> implements Serializable {
 
 	public int localCacheSize = 1000;
 	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-	KeyFactory keyFactory = null;
+	public KeyFactory keyFactory = null;
 	public Serializer<T> serializer = null;
 	public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
 }


[10/12] storm git commit: STORM-753 Correct README to reflect RedisStoreMapper

Posted by bo...@apache.org.
STORM-753 Correct README to reflect RedisStoreMapper


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1eb85a22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1eb85a22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1eb85a22

Branch: refs/heads/master
Commit: 1eb85a2285a4d291ce60734bb00e5d0747986885
Parents: 84fc764
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:55:45 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:55:45 2015 +0900

----------------------------------------------------------------------
 external/storm-redis/README.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1eb85a22/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 86cd937..6e4063e 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -193,7 +193,7 @@ RedisState
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
+                                new RedisStateUpdater(storeMapper).withExpire(86400000),
                                 new Fields());
 
         TridentState state = topology.newStaticState(factory);
@@ -220,7 +220,7 @@ RedisClusterState
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
+                                new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
                                 new Fields());
 
         TridentState state = topology.newStaticState(factory);


[08/12] storm git commit: STORM-753 Replace wildcard imports to each single class imports

Posted by bo...@apache.org.
STORM-753 Replace wildcard imports to each single class imports


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f8906fb4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f8906fb4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f8906fb4

Branch: refs/heads/master
Commit: f8906fb4d7750d5f009fccf98943060bb25e2f0c
Parents: 037a975
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:33:58 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:33:58 2015 +0900

----------------------------------------------------------------------
 .../redis/trident/state/AbstractRedisMapState.java   | 13 +++++++++++--
 .../redis/trident/state/RedisClusterMapState.java    | 15 ++++++++++++---
 .../storm/redis/trident/state/RedisMapState.java     | 15 ++++++++++++---
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
index 56b37b2..b6fc8d7 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -19,10 +19,19 @@ package org.apache.storm.redis.trident.state;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import storm.trident.state.*;
+import storm.trident.state.JSONNonTransactionalSerializer;
+import storm.trident.state.JSONOpaqueSerializer;
+import storm.trident.state.JSONTransactionalSerializer;
+import storm.trident.state.Serializer;
+import storm.trident.state.StateType;
 import storm.trident.state.map.IBackingMap;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
 	public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(

http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 10bf9ea..230f7f0 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -19,13 +19,22 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.state.*;
-import storm.trident.state.map.*;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
 
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 14b014e..85cbff2 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -19,13 +19,22 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
-import com.google.common.base.Strings;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
-import storm.trident.state.*;
-import storm.trident.state.map.*;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
 
 import java.util.List;
 import java.util.Map;