You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 22:41:00 UTC

[1/3] storm git commit: sync storm-redis with master

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch 511869c6f -> 81505f9c7


http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 535d7b9..77c6ee8 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCommands;
@@ -36,46 +40,11 @@ import redis.clients.jedis.exceptions.JedisException;
 public class PersistentWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String REDIS_BOLT = "REDIS_BOLT";
+    private static final String STORE_BOLT = "STORE_BOLT";
 
     private static final String TEST_REDIS_HOST = "127.0.0.1";
     private static final int TEST_REDIS_PORT = 6379;
 
-    public static class StoreCountRedisBolt extends AbstractRedisBolt {
-        private static final Logger LOG = LoggerFactory.getLogger(StoreCountRedisBolt.class);
-
-        public StoreCountRedisBolt(JedisPoolConfig config) {
-            super(config);
-        }
-
-        public StoreCountRedisBolt(JedisClusterConfig config) {
-            super(config);
-        }
-
-        @Override
-        public void execute(Tuple input) {
-            String word = input.getStringByField("word");
-            int count = input.getIntegerByField("count");
-
-            JedisCommands commands = null;
-            try {
-                commands = getInstance();
-                commands.incrBy(word, count);
-            } catch (JedisConnectionException e) {
-                throw new RuntimeException("Unfortunately, this test requires redis-server running", e);
-            } catch (JedisException e) {
-                LOG.error("Exception occurred from Jedis/Redis", e);
-            } finally {
-                returnInstance(commands);
-                this.collector.ack(input);
-            }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         Config config = new Config();
 
@@ -92,14 +61,15 @@ public class PersistentWordCount {
 
         WordSpout spout = new WordSpout();
         WordCounter bolt = new WordCounter();
-        StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig);
+        RedisStoreMapper storeMapper = setupStoreMapper();
+        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
 
         // wordSpout ==> countBolt ==> RedisBolt
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+        builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
 
         if (args.length == 2) {
             LocalCluster cluster = new LocalCluster();
@@ -114,4 +84,33 @@ public class PersistentWordCount {
             System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
         }
     }
+
+    private static RedisStoreMapper setupStoreMapper() {
+        return new WordCountStoreMapper();
+    }
+
+    private static class WordCountStoreMapper implements RedisStoreMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountStoreMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return tuple.getStringByField("count");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6a0548d..6f25038 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
 
 import java.util.Map;
 
 import static backtype.storm.utils.Utils.tuple;
 
 public class WordCounter implements IBasicBolt {
-
+    private Map<String, Integer> wordCounter = Maps.newHashMap();
 
     @SuppressWarnings("rawtypes")
     public void prepare(Map stormConf, TopologyContext context) {
     }
 
-    /*
-     * Just output the word value with a count of 1.
-     */
     public void execute(Tuple input, BasicOutputCollector collector) {
-        collector.emit(tuple(input.getValues().get(0), 1));
+        String word = input.getStringByField("word");
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
+        }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, String.valueOf(count)));
     }
 
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index a610f54..eb13399 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
 import storm.trident.testing.FixedBatchSpout;
 
 public class WordCountTridentRedis {
@@ -48,7 +47,7 @@ public class WordCountTridentRedis {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         RedisState.Factory factory = new RedisState.Factory(poolConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 8bea3ce..8562e77 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -23,11 +23,11 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -55,7 +55,7 @@ public class WordCountTridentRedisCluster {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index e9ae54d..de1f252 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,11 +23,9 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -58,7 +56,7 @@ public class WordCountTridentRedisClusterMap {
         }
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index b096e55..4d4afe8 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,12 +23,9 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisMapState;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -51,7 +48,7 @@ public class WordCountTridentRedisMap {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         StateFactory factory = RedisMapState.transactional(poolConfig);
 
         TridentTopology topology = new TridentTopology();

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
index 6454c9e..1e601c9 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
@@ -1,16 +1,16 @@
 package org.apache.storm.redis.trident;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
-import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 
-public class WordCountTupleMapper implements TridentTupleMapper {
+public class WordCountTupleMapper implements TupleMapper {
     @Override
-    public String getKeyFromTridentTuple(TridentTuple tuple) {
+    public String getKeyFromTuple(ITuple tuple) {
         return tuple.getString(0);
     }
 
     @Override
-    public String getValueFromTridentTuple(TridentTuple tuple) {
+    public String getValueFromTuple(ITuple tuple) {
         return tuple.getInteger(1).toString();
     }
 }


[3/3] storm git commit: update changelog for storm-redis changes merged from master

Posted by pt...@apache.org.
update changelog for storm-redis changes merged from master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81505f9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81505f9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81505f9c

Branch: refs/heads/0.10.x-branch
Commit: 81505f9c71a63b1d8af9825fc88bb8b59389f8a2
Parents: 0edb8ab
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 16:40:44 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 16:40:44 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/81505f9c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc49984..9109f01 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
 ## 0.10.0
+ * STORM-735: [storm-redis] Upgrade Jedis to 2.7.0
+ * STORM-724: Document RedisStoreBolt and RedisLookupBolt which is missed.
+ * STORM-703: With hash key option for RedisMapState, only get values for keys in batch
  * STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.
  * STORM-737] Check task->node+port with read lock to prevent sending to closed connection
  * STORM-835 Netty Client hold batch object until io operation complete


[2/3] storm git commit: sync storm-redis with master

Posted by pt...@apache.org.
sync storm-redis with master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0edb8ab9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0edb8ab9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0edb8ab9

Branch: refs/heads/0.10.x-branch
Commit: 0edb8ab90c6edbfaf46f05be5dc51c1a3c5b3d03
Parents: 511869c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 16:33:11 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 16:33:11 2015 -0400

----------------------------------------------------------------------
 external/storm-redis/README.md                  | 108 ++++++++++++++++-
 external/storm-redis/pom.xml                    |   4 +-
 .../storm/redis/bolt/AbstractRedisBolt.java     |   8 +-
 .../storm/redis/bolt/RedisLookupBolt.java       | 112 ++++++++++++++++++
 .../apache/storm/redis/bolt/RedisStoreBolt.java | 100 ++++++++++++++++
 .../redis/common/config/JedisClusterConfig.java |  82 +++++++++++++
 .../redis/common/config/JedisPoolConfig.java    |  97 ++++++++++++++++
 .../common/container/JedisClusterContainer.java |  47 ++++++++
 .../JedisCommandsContainerBuilder.java          |  38 ++++++
 .../JedisCommandsInstanceContainer.java         |  25 ++++
 .../redis/common/container/JedisContainer.java  |  60 ++++++++++
 .../common/mapper/RedisDataTypeDescription.java |  50 ++++++++
 .../redis/common/mapper/RedisLookupMapper.java  |  40 +++++++
 .../storm/redis/common/mapper/RedisMapper.java  |  22 ++++
 .../redis/common/mapper/RedisStoreMapper.java   |  21 ++++
 .../storm/redis/common/mapper/TupleMapper.java  |  27 +++++
 .../trident/mapper/TridentTupleMapper.java      |  27 -----
 .../trident/state/RedisClusterMapState.java     |   2 +-
 .../redis/trident/state/RedisClusterState.java  |   2 +-
 .../trident/state/RedisClusterStateQuerier.java |  10 +-
 .../trident/state/RedisClusterStateUpdater.java |  10 +-
 .../redis/trident/state/RedisMapState.java      |  31 ++---
 .../storm/redis/trident/state/RedisState.java   |   2 +-
 .../redis/trident/state/RedisStateQuerier.java  |  10 +-
 .../redis/trident/state/RedisStateUpdater.java  |  10 +-
 .../redis/util/config/JedisClusterConfig.java   |  82 -------------
 .../redis/util/config/JedisPoolConfig.java      |  97 ----------------
 .../util/container/JedisClusterContainer.java   |  47 --------
 .../JedisCommandsContainerBuilder.java          |  38 ------
 .../JedisCommandsInstanceContainer.java         |  25 ----
 .../redis/util/container/JedisContainer.java    |  65 -----------
 .../storm/redis/topology/LookupWordCount.java   | 115 +++++++++++++------
 .../redis/topology/PersistentWordCount.java     |  81 +++++++------
 .../storm/redis/topology/WordCounter.java       |  19 ++-
 .../redis/trident/WordCountTridentRedis.java    |   7 +-
 .../trident/WordCountTridentRedisCluster.java   |   6 +-
 .../WordCountTridentRedisClusterMap.java        |   8 +-
 .../redis/trident/WordCountTridentRedisMap.java |   9 +-
 .../redis/trident/WordCountTupleMapper.java     |  10 +-
 39 files changed, 1016 insertions(+), 538 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 17b5904..f165c09 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -2,6 +2,8 @@
 
 Storm/Trident integration for [Redis](http://redis.io/)
 
+Storm-redis uses Jedis for Redis client.
+
 ## Usage
 
 ### How do I use it?
@@ -12,12 +14,114 @@ use it as a maven dependency:
 <dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-redis</artifactId>
-    <version>0.10.0</version>
+    <version>{storm.version}</version>
     <type>jar</type>
 </dependency>
 ```
 
-### AbstractRedisBolt usage:
+### For normal Bolt
+
+Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and ```RedisStoreBolt```.
+
+As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis using key, and ```RedisStoreBolt``` stores key / value to Redis. One tuple will be matched to one key / value pair, and you can define match pattern to ```TupleMapper```.
+
+You can also choose data type from ```RedisDataTypeDescription``` to use. Please refer ```RedisDataTypeDescription.RedisDataType``` to see what data types are supported. In some data types (hash and sorted set), it requires additional key and converted key from tuple becomes element.
+
+These interfaces are combined with ```RedisLookupMapper``` and ```RedisStoreMapper``` which fit ```RedisLookupBolt``` and ```RedisStoreBolt``` respectively.
+
+#### RedisLookupBolt example
+
+```java
+
+class WordCountRedisLookupMapper implements RedisLookupMapper {
+    private RedisDataTypeDescription description;
+    private final String hashKey = "wordCount";
+
+    public WordCountRedisLookupMapper() {
+        description = new RedisDataTypeDescription(
+                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+    }
+
+    @Override
+    public List<Values> toTuple(ITuple input, Object value) {
+        String member = getKeyFromTuple(input);
+        List<Values> values = Lists.newArrayList();
+        values.add(new Values(member, value));
+        return values;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("wordName", "count"));
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return null;
+    }
+}
+
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+        .setHost(host).setPort(port).build();
+RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
+RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+```
+
+#### RedisStoreBolt example
+
+```java
+
+class WordCountStoreMapper implements RedisStoreMapper {
+    private RedisDataTypeDescription description;
+    private final String hashKey = "wordCount";
+
+    public WordCountStoreMapper() {
+        description = new RedisDataTypeDescription(
+            RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+    }
+
+    @Override
+    public RedisDataTypeDescription getDataTypeDescription() {
+        return description;
+    }
+
+    @Override
+    public String getKeyFromTuple(ITuple tuple) {
+        return tuple.getStringByField("word");
+    }
+
+    @Override
+    public String getValueFromTuple(ITuple tuple) {
+        return tuple.getStringByField("count");
+    }
+}
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost(host).setPort(port).build();
+RedisStoreMapper storeMapper = new WordCountStoreMapper();
+RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
+```
+
+### For non-simple Bolt
+
+If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply your business logic.
 
 ```java
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index e4d7daa..9033998 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.10.0-SNAPSHOT</version>
+        <version>0.11.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
@@ -41,7 +41,7 @@
     </developers>
 
     <properties>
-        <jedis.version>2.6.2</jedis.version>
+        <jedis.version>2.7.0</jedis.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 0b2a7f3..158fcaa 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -20,10 +20,10 @@ package org.apache.storm.redis.bolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
+import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
 import redis.clients.jedis.JedisCommands;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
new file mode 100644
index 0000000..bbd7e6a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+public class RedisLookupBolt extends AbstractRedisBolt {
+    private final RedisLookupMapper lookupMapper;
+    private final RedisDataTypeDescription.RedisDataType dataType;
+    private final String additionalKey;
+
+    public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
+        super(config);
+
+        this.lookupMapper = lookupMapper;
+
+        RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
+        super(config);
+
+        this.lookupMapper = lookupMapper;
+
+        RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String key = lookupMapper.getKeyFromTuple(input);
+        Object lookupValue = null;
+
+        JedisCommands jedisCommand = null;
+        try {
+            jedisCommand = getInstance();
+
+            switch (dataType) {
+                case STRING:
+                    lookupValue = jedisCommand.get(key);
+                    break;
+
+                case LIST:
+                    lookupValue = jedisCommand.lpop(key);
+                    break;
+
+                case HASH:
+                    lookupValue = jedisCommand.hget(additionalKey, key);
+                    break;
+
+                case SET:
+                    lookupValue = jedisCommand.scard(key);
+                    break;
+
+                case SORTED_SET:
+                    lookupValue = jedisCommand.zscore(additionalKey, key);
+                    break;
+
+                case HYPER_LOG_LOG:
+                    lookupValue = jedisCommand.pfcount(key);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+            }
+
+            List<Values> values = lookupMapper.toTuple(input, lookupValue);
+            for (Values value : values) {
+                collector.emit(input, value);
+            }
+
+            collector.ack(input);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(input);
+        } finally {
+            returnInstance(jedisCommand);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        lookupMapper.declareOutputFields(declarer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
new file mode 100644
index 0000000..761c5ed
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import redis.clients.jedis.JedisCommands;
+
+public class RedisStoreBolt extends AbstractRedisBolt {
+    private final RedisStoreMapper storeMapper;
+    private final RedisDataTypeDescription.RedisDataType dataType;
+    private final String additionalKey;
+
+    public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
+        super(config);
+        this.storeMapper = storeMapper;
+
+        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
+        super(config);
+        this.storeMapper = storeMapper;
+
+        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String key = storeMapper.getKeyFromTuple(input);
+        String value = storeMapper.getValueFromTuple(input);
+
+        JedisCommands jedisCommand = null;
+        try {
+            jedisCommand = getInstance();
+
+            switch (dataType) {
+                case STRING:
+                    jedisCommand.set(key, value);
+                    break;
+
+                case LIST:
+                    jedisCommand.rpush(key, value);
+                    break;
+
+                case HASH:
+                    jedisCommand.hset(additionalKey, key, value);
+                    break;
+
+                case SET:
+                    jedisCommand.sadd(key, value);
+                    break;
+
+                case SORTED_SET:
+                    jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
+
+                case HYPER_LOG_LOG:
+                    jedisCommand.pfadd(key, value);
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+            }
+
+            collector.ack(input);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(input);
+        } finally {
+            returnInstance(jedisCommand);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
new file mode 100644
index 0000000..a13eced
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfig implements Serializable {
+    private Set<InetSocketAddress> nodes;
+    private int timeout;
+    private int maxRedirections;
+
+    public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
+        this.nodes = nodes;
+        this.timeout = timeout;
+        this.maxRedirections = maxRedirections;
+    }
+
+    public Set<HostAndPort> getNodes() {
+        Set<HostAndPort> ret = new HashSet<HostAndPort>();
+        for (InetSocketAddress node : nodes) {
+            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+        }
+        return ret;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public int getMaxRedirections() {
+        return maxRedirections;
+    }
+
+    public static class Builder {
+        private Set<InetSocketAddress> nodes;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int maxRedirections = 5;
+
+        public Builder setNodes(Set<InetSocketAddress> nodes) {
+            this.nodes = nodes;
+            return this;
+        }
+
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public Builder setMaxRedirections(int maxRedirections) {
+            this.maxRedirections = maxRedirections;
+            return this;
+        }
+
+        public JedisClusterConfig build() {
+            Preconditions.checkNotNull(this.nodes, "Node information should be presented");
+
+            return new JedisClusterConfig(nodes, timeout, maxRedirections);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
new file mode 100644
index 0000000..cc5f6e4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+public class JedisPoolConfig implements Serializable {
+    public static final String DEFAULT_HOST = "127.0.0.1";
+
+    private String host;
+    private int port;
+    private int timeout;
+    private int database;
+    private String password;
+
+    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
+        this.host = host;
+        this.port = port;
+        this.timeout = timeout;
+        this.database = database;
+        this.password = password;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public int getDatabase() {
+        return database;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public static class Builder {
+        private String host = DEFAULT_HOST;
+        private int port = Protocol.DEFAULT_PORT;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int database = Protocol.DEFAULT_DATABASE;
+        private String password;
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public Builder setDatabase(int database) {
+            this.database = database;
+            return this;
+        }
+
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        public JedisPoolConfig build() {
+            return new JedisPoolConfig(host, port, timeout, password, database);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
new file mode 100644
index 0000000..a1ff19f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisCommands;
+
+import java.io.Closeable;
+
+public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
+
+    private JedisCluster jedisCluster;
+
+    public JedisClusterContainer(JedisCluster jedisCluster) {
+        this.jedisCluster = jedisCluster;
+    }
+
+    @Override
+    public JedisCommands getInstance() {
+        return this.jedisCluster;
+    }
+
+    @Override
+    public void returnInstance(JedisCommands jedisCommands) {
+        // do nothing
+    }
+
+    @Override
+    public void close() {
+        this.jedisCluster.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..a2f8c2e
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+
+public class JedisCommandsContainerBuilder {
+
+    public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+    public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
+        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
+        return new JedisContainer(jedisPool);
+    }
+
+    public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
+        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+        return new JedisClusterContainer(jedisCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
new file mode 100644
index 0000000..9ec32b9
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCommands;
+
+public interface JedisCommandsInstanceContainer {
+    JedisCommands getInstance();
+    void returnInstance(JedisCommands jedisCommands);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
new file mode 100644
index 0000000..8e1fdf5
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
+
+    private JedisPool jedisPool;
+
+    public JedisContainer(JedisPool jedisPool) {
+        this.jedisPool = jedisPool;
+    }
+
+    @Override
+    public JedisCommands getInstance() {
+        return jedisPool.getResource();
+    }
+
+    @Override
+    public void returnInstance(JedisCommands jedisCommands) {
+        if (jedisCommands == null) {
+            return;
+        }
+
+        try {
+            ((Closeable) jedisCommands).close();
+        } catch (IOException e) {
+            LOG.error("Failed to close (return) instance to pool");
+        }
+    }
+
+    @Override
+    public void close() {
+        jedisPool.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
new file mode 100644
index 0000000..ba981d0
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Serializable {
+    public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG }
+
+    private RedisDataType dataType;
+    private String additionalKey;
+
+    public RedisDataTypeDescription(RedisDataType dataType) {
+        this(dataType, null);
+    }
+
+    public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) {
+        this.dataType = dataType;
+        this.additionalKey = additionalKey;
+
+        if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) {
+            if (additionalKey == null) {
+                throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+            }
+        }
+    }
+
+    public RedisDataType getDataType() {
+        return dataType;
+    }
+
+    public String getAdditionalKey() {
+        return additionalKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
new file mode 100644
index 0000000..880aea1
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+
+import java.util.List;
+
+public interface RedisLookupMapper extends TupleMapper, RedisMapper {
+    /**
+     * Converts return value from Redis to a list of storm values that can be emitted.
+     * @param input the input tuple.
+     * @param value Redis query response value. Can be String, Boolean, Long regarding of data type.
+     * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+     */
+    public List<Values> toTuple(ITuple input, Object value);
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..416ce5f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisMapper {
+    public RedisDataTypeDescription getDataTypeDescription();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
new file mode 100644
index 0000000..b3d7adf
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisStoreMapper extends TupleMapper, RedisMapper {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
new file mode 100644
index 0000000..86664b8
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TupleMapper extends Serializable {
+    public String getKeyFromTuple(ITuple tuple);
+    public String getValueFromTuple(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
deleted file mode 100644
index 4c10143..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleMapper extends Serializable {
-    public String getKeyFromTridentTuple(TridentTuple tuple);
-    public String getValueFromTridentTuple(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 24c1df1..1154376 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 493ffdd..d74e838 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index e0207e2..17614a1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
@@ -33,9 +33,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
 
-    public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -52,7 +52,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
 
 
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     key = redisKeyPrefix + key;
                 }
@@ -72,7 +72,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
 
     @Override
     public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        String key = this.tupleMapper.getKeyFromTuple(tuple);
         collector.emit(new Values(key, s));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 8512888..e00cfb6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
@@ -31,10 +31,10 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
     private int expireIntervalSec = 0;
 
-    public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -57,12 +57,12 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
         try {
             jedisCluster = redisClusterState.getJedisCluster();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 String redisKey = key;
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     redisKey = redisKeyPrefix + redisKey;
                 }
-                String value = this.tupleMapper.getValueFromTridentTuple(input);
+                String value = this.tupleMapper.getValueFromTuple(input);
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index f934cea..82ca8bb 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -22,7 +22,7 @@ import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -223,8 +223,10 @@ public class RedisMapState<T> implements IBackingMap<T> {
         if (keys.size() == 0) {
             return Collections.emptyList();
         }
+
+        String[] stringKeys = buildKeys(keys);
+
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            String[] stringKeys = buildKeys(keys);
             Jedis jedis = null;
             try {
                 jedis = jedisPool.getResource();
@@ -232,34 +234,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
                 return deserializeValues(keys, values);
             } finally {
                 if (jedis != null) {
-                    jedisPool.returnResource(jedis);
+                    jedis.close();
                 }
             }
         } else {
             Jedis jedis = null;
             try {
                 jedis = jedisPool.getResource();
-                Map<String, String> keyValue = jedis.hgetAll(this.options.hkey);
-                List<String> values = buildValuesFromMap(keys, keyValue);
+                List<String> values = jedis.hmget(this.options.hkey, stringKeys);
                 return deserializeValues(keys, values);
             } finally {
                 if (jedis != null) {
-                    jedisPool.returnResource(jedis);
+                    jedis.close();
                 }
             }
         }
     }
 
-    private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
-        List<String> values = new ArrayList<String>(keys.size());
-        for (List<Object> key : keys) {
-            String strKey = keyFactory.build(key);
-            String value = keyValue.get(strKey);
-            values.add(value);
-        }
-        return values;
-    }
-
     private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
         List<T> result = new ArrayList<T>(keys.size());
         for (String value : values) {
@@ -293,7 +284,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
                 jedis.mset(keyValue);
             } finally {
                 if (jedis != null) {
-                    jedisPool.returnResource(jedis);
+                    jedis.close();
                 }
             }
         } else {
@@ -303,11 +294,13 @@ public class RedisMapState<T> implements IBackingMap<T> {
                 for (int i = 0; i < keys.size(); i++) {
                     String val = new String(serializer.serialize(vals.get(i)));
                     keyValues.put(keyFactory.build(keys.get(i)), val);
-                }               
+                }
                 jedis.hmset(this.options.hkey, keyValues);
 
             } finally {
-                jedisPool.returnResource(jedis);
+                if (jedis != null) {
+                    jedis.close();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index 72b1267..3441936 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 051088e..294e83b 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
 
-    public RedisStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -44,7 +44,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
         List<String> keys = Lists.newArrayList();
         for (TridentTuple input : inputs) {
-            String key = this.tupleMapper.getKeyFromTridentTuple(input);
+            String key = this.tupleMapper.getKeyFromTuple(input);
             if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                 key = redisKeyPrefix + key;
             }
@@ -64,7 +64,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
 
     @Override
     public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        String key = this.tupleMapper.getKeyFromTuple(tuple);
         collector.emit(new Values(key, s));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index c53476f..2939d3d 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -31,10 +31,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
     private int expireIntervalSec = 0;
 
-    public RedisStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -56,12 +56,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
         try {
             jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 String redisKey = key;
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     redisKey = redisKeyPrefix + redisKey;
                 }
-                String value = this.tupleMapper.getValueFromTridentTuple(input);
+                String value = this.tupleMapper.getValueFromTuple(input);
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
deleted file mode 100644
index 355119a..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import com.google.common.base.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfig implements Serializable {
-    private Set<InetSocketAddress> nodes;
-    private int timeout;
-    private int maxRedirections;
-
-    public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
-        this.nodes = nodes;
-        this.timeout = timeout;
-        this.maxRedirections = maxRedirections;
-    }
-
-    public Set<HostAndPort> getNodes() {
-        Set<HostAndPort> ret = new HashSet<HostAndPort>();
-        for (InetSocketAddress node : nodes) {
-            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
-        }
-        return ret;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public int getMaxRedirections() {
-        return maxRedirections;
-    }
-
-    public static class Builder {
-        private Set<InetSocketAddress> nodes;
-        private int timeout = Protocol.DEFAULT_TIMEOUT;
-        private int maxRedirections = 5;
-
-        public Builder setNodes(Set<InetSocketAddress> nodes) {
-            this.nodes = nodes;
-            return this;
-        }
-
-        public Builder setTimeout(int timeout) {
-            this.timeout = timeout;
-            return this;
-        }
-
-        public Builder setMaxRedirections(int maxRedirections) {
-            this.maxRedirections = maxRedirections;
-            return this;
-        }
-
-        public JedisClusterConfig build() {
-            Preconditions.checkNotNull(this.nodes, "Node information should be presented");
-
-            return new JedisClusterConfig(nodes, timeout, maxRedirections);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
deleted file mode 100644
index 9a42cf7..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-
-public class JedisPoolConfig implements Serializable {
-    public static final String DEFAULT_HOST = "127.0.0.1";
-
-    private String host;
-    private int port;
-    private int timeout;
-    private int database;
-    private String password;
-
-    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
-        this.host = host;
-        this.port = port;
-        this.timeout = timeout;
-        this.database = database;
-        this.password = password;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public int getDatabase() {
-        return database;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public static class Builder {
-        private String host = DEFAULT_HOST;
-        private int port = Protocol.DEFAULT_PORT;
-        private int timeout = Protocol.DEFAULT_TIMEOUT;
-        private int database = Protocol.DEFAULT_DATABASE;
-        private String password;
-
-        public Builder setHost(String host) {
-            this.host = host;
-            return this;
-        }
-
-        public Builder setPort(int port) {
-            this.port = port;
-            return this;
-        }
-
-        public Builder setTimeout(int timeout) {
-            this.timeout = timeout;
-            return this;
-        }
-
-        public Builder setDatabase(int database) {
-            this.database = database;
-            return this;
-        }
-
-        public Builder setPassword(String password) {
-            this.password = password;
-            return this;
-        }
-
-        public JedisPoolConfig build() {
-            return new JedisPoolConfig(host, port, timeout, password, database);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
deleted file mode 100644
index 5fd4115..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
-
-import java.io.Closeable;
-
-public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
-
-    private JedisCluster jedisCluster;
-
-    public JedisClusterContainer(JedisCluster jedisCluster) {
-        this.jedisCluster = jedisCluster;
-    }
-
-    @Override
-    public JedisCommands getInstance() {
-        return this.jedisCluster;
-    }
-
-    @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        // do nothing
-    }
-
-    @Override
-    public void close() {
-        this.jedisCluster.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
deleted file mode 100644
index 8d2dd38..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-public class JedisCommandsContainerBuilder {
-
-    public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
-    public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
-        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
-        return new JedisContainer(jedisPool);
-    }
-
-    public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
-        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
-        return new JedisClusterContainer(jedisCluster);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
deleted file mode 100644
index 847d6a5..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCommands;
-
-public interface JedisCommandsInstanceContainer {
-    JedisCommands getInstance();
-    void returnInstance(JedisCommands jedisCommands);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
deleted file mode 100644
index e75cccc..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.JedisPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
-
-    private JedisPool jedisPool;
-
-    public JedisContainer(JedisPool jedisPool) {
-        this.jedisPool = jedisPool;
-    }
-
-    @Override
-    public JedisCommands getInstance() {
-        return jedisPool.getResource();
-    }
-
-    @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        if (jedisCommands == null) {
-            return;
-        }
-
-        try {
-            ((Closeable) jedisCommands).close();
-        } catch (IOException e) {
-            LOG.warn("Failed to close (return) instance to pool");
-            try {
-                jedisPool.returnBrokenResource((Jedis) jedisCommands);
-            } catch (Exception e2) {
-                LOG.error("Failed to discard instance from pool");
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        jedisPool.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index a62fdff..ae053de 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -20,72 +20,65 @@ package org.apache.storm.redis.topology;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 public class LookupWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String PRINT_BOLT = "PRINT_BOLT";
 
     private static final String TEST_REDIS_HOST = "127.0.0.1";
     private static final int TEST_REDIS_PORT = 6379;
 
-    public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
-        private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
+    public static class PrintWordTotalCountBolt extends BaseRichBolt {
+        private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
         private static final Random RANDOM = new Random();
+        private OutputCollector collector;
 
-        public LookupWordTotalCountBolt(JedisPoolConfig config) {
-            super(config);
-        }
-
-        public LookupWordTotalCountBolt(JedisClusterConfig config) {
-            super(config);
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
         }
 
         @Override
         public void execute(Tuple input) {
-            JedisCommands jedisCommands = null;
-            try {
-                jedisCommands = getInstance();
-                String wordName = input.getStringByField("word");
-                String countStr = jedisCommands.get(wordName);
+            String wordName = input.getStringByField("wordName");
+            String countStr = input.getStringByField("count");
+
+            // print lookup result with low probability
+            if(RANDOM.nextInt(1000) > 995) {
+                int count = 0;
                 if (countStr != null) {
-                    int count = Integer.parseInt(countStr);
-                    this.collector.emit(new Values(wordName, count));
-
-                    // print lookup result with low probability
-                    if(RANDOM.nextInt(1000) > 995) {
-                        LOG.info("Lookup result - word : " + wordName + " / count : " + count);
-                    }
-                } else {
-                    // skip
-                    LOG.warn("Word not found in Redis - word : " + wordName);
+                    count = Integer.parseInt(countStr);
                 }
-            } finally {
-                if (jedisCommands != null) {
-                    returnInstance(jedisCommands);
-                }
-                this.collector.ack(input);
+                LOG.info("Lookup result - word : " + wordName + " / count : " + count);
             }
+
+            collector.ack(input);
         }
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // wordName, count
-            declarer.declare(new Fields("wordName", "count"));
         }
     }
 
@@ -104,12 +97,16 @@ public class LookupWordCount {
                 .setHost(host).setPort(port).build();
 
         WordSpout spout = new WordSpout();
-        LookupWordTotalCountBolt redisLookupBolt = new LookupWordTotalCountBolt(poolConfig);
+        RedisLookupMapper lookupMapper = setupLookupMapper();
+        RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+        PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
 
         //wordspout -> lookupbolt
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(LOOKUP_BOLT, redisLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
 
         if (args.length == 2) {
             LocalCluster cluster = new LocalCluster();
@@ -124,4 +121,46 @@ public class LookupWordCount {
             System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
         }
     }
+
+    private static RedisLookupMapper setupLookupMapper() {
+        return new WordCountRedisLookupMapper();
+    }
+
+    private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountRedisLookupMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public List<Values> toTuple(ITuple input, Object value) {
+            String member = getKeyFromTuple(input);
+            List<Values> values = Lists.newArrayList();
+            values.add(new Values(member, value));
+            return values;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("wordName", "count"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return null;
+        }
+    }
 }