You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:33 UTC

[02/45] storm git commit: STORM-691 Add basic lookup / persist bolts

STORM-691 Add basic lookup / persist bolts

* Add Basic lookup / persist Bolts
** support data types : string, list, hash, set, sorted set, hyperloglog
* rename util package to common


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7c0bf8a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7c0bf8a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7c0bf8a

Branch: refs/heads/nimbus-ha-branch
Commit: f7c0bf8a7c843c6e555ee982a85e3952d1c28b33
Parents: 64d7ac6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Feb 28 22:29:32 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Feb 28 22:29:32 2015 +0900

----------------------------------------------------------------------
 .../storm/redis/bolt/AbstractRedisBolt.java     |   8 +-
 .../storm/redis/bolt/RedisLookupBolt.java       | 109 ++++++++++++++++++
 .../apache/storm/redis/bolt/RedisStoreBolt.java |  97 ++++++++++++++++
 .../redis/common/config/JedisClusterConfig.java |  82 +++++++++++++
 .../redis/common/config/JedisPoolConfig.java    |  97 ++++++++++++++++
 .../common/container/JedisClusterContainer.java |  47 ++++++++
 .../JedisCommandsContainerBuilder.java          |  38 ++++++
 .../JedisCommandsInstanceContainer.java         |  25 ++++
 .../redis/common/container/JedisContainer.java  |  65 +++++++++++
 .../common/mapper/RedisDataTypeDescription.java |  33 ++++++
 .../redis/common/mapper/RedisLookupMapper.java  |  40 +++++++
 .../storm/redis/common/mapper/RedisMapper.java  |   5 +
 .../redis/common/mapper/RedisStoreMapper.java   |  21 ++++
 .../storm/redis/common/mapper/TupleMapper.java  |  27 +++++
 .../trident/mapper/TridentTupleMapper.java      |  27 -----
 .../trident/state/RedisClusterMapState.java     |   2 +-
 .../redis/trident/state/RedisClusterState.java  |   2 +-
 .../trident/state/RedisClusterStateQuerier.java |  10 +-
 .../trident/state/RedisClusterStateUpdater.java |  10 +-
 .../redis/trident/state/RedisMapState.java      |   2 +-
 .../storm/redis/trident/state/RedisState.java   |   2 +-
 .../redis/trident/state/RedisStateQuerier.java  |  10 +-
 .../state/RedisStateSetCountQuerier.java        |  10 +-
 .../trident/state/RedisStateSetUpdater.java     |  10 +-
 .../redis/trident/state/RedisStateUpdater.java  |  10 +-
 .../redis/util/config/JedisClusterConfig.java   |  82 -------------
 .../redis/util/config/JedisPoolConfig.java      |  97 ----------------
 .../util/container/JedisClusterContainer.java   |  47 --------
 .../JedisCommandsContainerBuilder.java          |  38 ------
 .../JedisCommandsInstanceContainer.java         |  25 ----
 .../redis/util/container/JedisContainer.java    |  65 -----------
 .../storm/redis/topology/LookupWordCount.java   | 115 +++++++++++++------
 .../redis/topology/PersistentWordCount.java     |  46 +++++++-
 .../storm/redis/topology/WordCounter.java       |  19 ++-
 .../redis/trident/WordCountTridentRedis.java    |   7 +-
 .../trident/WordCountTridentRedisCluster.java   |   6 +-
 .../WordCountTridentRedisClusterMap.java        |   8 +-
 .../redis/trident/WordCountTridentRedisMap.java |   9 +-
 .../redis/trident/WordCountTupleMapper.java     |  10 +-
 39 files changed, 872 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 0b2a7f3..158fcaa 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -20,10 +20,10 @@ package org.apache.storm.redis.bolt;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
+import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
 import redis.clients.jedis.JedisCommands;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
new file mode 100644
index 0000000..c40e983
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+public class RedisLookupBolt extends AbstractRedisBolt {
+    private final RedisLookupMapper lookupMapper;
+    private final RedisDataTypeDescription.RedisDataType dataType;
+    private final String additionalKey;
+
+    public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
+        super(config);
+
+        this.lookupMapper = lookupMapper;
+
+        RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
+        super(config);
+
+        this.lookupMapper = lookupMapper;
+
+        RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String key = lookupMapper.getKeyFromTuple(input);
+        Object lookupValue = null;
+
+        JedisCommands jedisCommand = null;
+        try {
+            jedisCommand = getInstance();
+
+            switch (dataType) {
+                case STRING:
+                    lookupValue = jedisCommand.get(key);
+                    break;
+
+                case LIST:
+                    lookupValue = jedisCommand.lpop(key);
+                    break;
+
+                case HASH:
+                    lookupValue = jedisCommand.hget(additionalKey, key);
+                    break;
+
+                case SET:
+                    lookupValue = jedisCommand.scard(key);
+                    break;
+
+                case SORTED_SET:
+                    lookupValue = jedisCommand.zscore(additionalKey, key);
+                    break;
+
+                case HYPER_LOG_LOG:
+                    lookupValue = jedisCommand.pfcount(key);
+                    break;
+            }
+
+            List<Values> values = lookupMapper.toTuple(input, lookupValue);
+            for (Values value : values) {
+                collector.emit(input, value);
+            }
+
+            collector.ack(input);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(input);
+        } finally {
+            returnInstance(jedisCommand);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        lookupMapper.declareOutputFields(declarer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
new file mode 100644
index 0000000..5602c44
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import redis.clients.jedis.JedisCommands;
+
+public class RedisStoreBolt extends AbstractRedisBolt {
+    private final RedisStoreMapper storeMapper;
+    private final RedisDataTypeDescription.RedisDataType dataType;
+    private final String additionalKey;
+
+    public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
+        super(config);
+        this.storeMapper = storeMapper;
+
+        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
+        super(config);
+        this.storeMapper = storeMapper;
+
+        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+        this.dataType = dataTypeDescription.getDataType();
+        this.additionalKey = dataTypeDescription.getAdditionalKey();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String key = storeMapper.getKeyFromTuple(input);
+        String value = storeMapper.getValueFromTuple(input);
+
+        JedisCommands jedisCommand = null;
+        try {
+            jedisCommand = getInstance();
+
+            switch (dataType) {
+                case STRING:
+                    jedisCommand.set(key, value);
+                    break;
+
+                case LIST:
+                    jedisCommand.rpush(key, value);
+                    break;
+
+                case HASH:
+                    jedisCommand.hset(additionalKey, key, value);
+                    break;
+
+                case SET:
+                    jedisCommand.sadd(key, value);
+                    break;
+
+                case SORTED_SET:
+                    jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
+
+                case HYPER_LOG_LOG:
+                    jedisCommand.pfadd(key, value);
+                    break;
+            }
+
+            collector.ack(input);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(input);
+        } finally {
+            returnInstance(jedisCommand);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
new file mode 100644
index 0000000..a13eced
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfig implements Serializable {
+    private Set<InetSocketAddress> nodes;
+    private int timeout;
+    private int maxRedirections;
+
+    public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
+        this.nodes = nodes;
+        this.timeout = timeout;
+        this.maxRedirections = maxRedirections;
+    }
+
+    public Set<HostAndPort> getNodes() {
+        Set<HostAndPort> ret = new HashSet<HostAndPort>();
+        for (InetSocketAddress node : nodes) {
+            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+        }
+        return ret;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public int getMaxRedirections() {
+        return maxRedirections;
+    }
+
+    public static class Builder {
+        private Set<InetSocketAddress> nodes;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int maxRedirections = 5;
+
+        public Builder setNodes(Set<InetSocketAddress> nodes) {
+            this.nodes = nodes;
+            return this;
+        }
+
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public Builder setMaxRedirections(int maxRedirections) {
+            this.maxRedirections = maxRedirections;
+            return this;
+        }
+
+        public JedisClusterConfig build() {
+            Preconditions.checkNotNull(this.nodes, "Node information should be presented");
+
+            return new JedisClusterConfig(nodes, timeout, maxRedirections);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
new file mode 100644
index 0000000..cc5f6e4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+public class JedisPoolConfig implements Serializable {
+    public static final String DEFAULT_HOST = "127.0.0.1";
+
+    private String host;
+    private int port;
+    private int timeout;
+    private int database;
+    private String password;
+
+    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
+        this.host = host;
+        this.port = port;
+        this.timeout = timeout;
+        this.database = database;
+        this.password = password;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public int getDatabase() {
+        return database;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public static class Builder {
+        private String host = DEFAULT_HOST;
+        private int port = Protocol.DEFAULT_PORT;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int database = Protocol.DEFAULT_DATABASE;
+        private String password;
+
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        public Builder setDatabase(int database) {
+            this.database = database;
+            return this;
+        }
+
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        public JedisPoolConfig build() {
+            return new JedisPoolConfig(host, port, timeout, password, database);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
new file mode 100644
index 0000000..a1ff19f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisCommands;
+
+import java.io.Closeable;
+
+public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
+
+    private JedisCluster jedisCluster;
+
+    public JedisClusterContainer(JedisCluster jedisCluster) {
+        this.jedisCluster = jedisCluster;
+    }
+
+    @Override
+    public JedisCommands getInstance() {
+        return this.jedisCluster;
+    }
+
+    @Override
+    public void returnInstance(JedisCommands jedisCommands) {
+        // do nothing
+    }
+
+    @Override
+    public void close() {
+        this.jedisCluster.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..a2f8c2e
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+
+public class JedisCommandsContainerBuilder {
+
+    public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+    public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
+        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
+        return new JedisContainer(jedisPool);
+    }
+
+    public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
+        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+        return new JedisClusterContainer(jedisCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
new file mode 100644
index 0000000..9ec32b9
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCommands;
+
+public interface JedisCommandsInstanceContainer {
+    JedisCommands getInstance();
+    void returnInstance(JedisCommands jedisCommands);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
new file mode 100644
index 0000000..621c05b
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
+
+    private JedisPool jedisPool;
+
+    public JedisContainer(JedisPool jedisPool) {
+        this.jedisPool = jedisPool;
+    }
+
+    @Override
+    public JedisCommands getInstance() {
+        return jedisPool.getResource();
+    }
+
+    @Override
+    public void returnInstance(JedisCommands jedisCommands) {
+        if (jedisCommands == null) {
+            return;
+        }
+
+        try {
+            ((Closeable) jedisCommands).close();
+        } catch (IOException e) {
+            LOG.warn("Failed to close (return) instance to pool");
+            try {
+                jedisPool.returnBrokenResource((Jedis) jedisCommands);
+            } catch (Exception e2) {
+                LOG.error("Failed to discard instance from pool");
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        jedisPool.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
new file mode 100644
index 0000000..d2a4af2
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
@@ -0,0 +1,33 @@
+package org.apache.storm.redis.common.mapper;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Serializable {
+    public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG }
+
+    private RedisDataType dataType;
+    private String additionalKey;
+
+    public RedisDataTypeDescription(RedisDataType dataType) {
+        this(dataType, null);
+    }
+
+    public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) {
+        this.dataType = dataType;
+        this.additionalKey = additionalKey;
+
+        if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) {
+            if (additionalKey == null) {
+                throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+            }
+        }
+    }
+
+    public RedisDataType getDataType() {
+        return dataType;
+    }
+
+    public String getAdditionalKey() {
+        return additionalKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
new file mode 100644
index 0000000..880aea1
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+
+import java.util.List;
+
+public interface RedisLookupMapper extends TupleMapper, RedisMapper {
+    /**
+     * Converts return value from Redis to a list of storm values that can be emitted.
+     * @param input the input tuple.
+     * @param value Redis query response value. Can be String, Boolean, Long regarding of data type.
+     * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+     */
+    public List<Values> toTuple(ITuple input, Object value);
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..d19acaa
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,5 @@
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisMapper {
+    public RedisDataTypeDescription getDataTypeDescription();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
new file mode 100644
index 0000000..b3d7adf
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisStoreMapper extends TupleMapper, RedisMapper {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
new file mode 100644
index 0000000..86664b8
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TupleMapper extends Serializable {
+    public String getKeyFromTuple(ITuple tuple);
+    public String getValueFromTuple(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
deleted file mode 100644
index 4c10143..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleMapper extends Serializable {
-    public String getKeyFromTridentTuple(TridentTuple tuple);
-    public String getValueFromTridentTuple(TridentTuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 24c1df1..1154376 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 493ffdd..d74e838 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index e0207e2..17614a1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
@@ -33,9 +33,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
 
-    public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -52,7 +52,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
 
 
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     key = redisKeyPrefix + key;
                 }
@@ -72,7 +72,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
 
     @Override
     public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        String key = this.tupleMapper.getKeyFromTuple(tuple);
         collector.emit(new Values(key, s));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index e72735a..023b527 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
@@ -31,10 +31,10 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
     private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
     private final int expireIntervalSec;
 
-    public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+    public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
         if (expireIntervalSec > 0) {
@@ -52,12 +52,12 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
         try {
             jedisCluster = redisClusterState.getJedisCluster();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 String redisKey = key;
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     redisKey = redisKeyPrefix + redisKey;
                 }
-                String value = this.tupleMapper.getValueFromTridentTuple(input);
+                String value = this.tupleMapper.getValueFromTuple(input);
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index f934cea..7f3edd1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -22,7 +22,7 @@ import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index f2fd624..2c7fd13 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 051088e..294e83b 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
 
-    public RedisStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -44,7 +44,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
         List<String> keys = Lists.newArrayList();
         for (TridentTuple input : inputs) {
-            String key = this.tupleMapper.getKeyFromTridentTuple(input);
+            String key = this.tupleMapper.getKeyFromTuple(input);
             if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                 key = redisKeyPrefix + key;
             }
@@ -64,7 +64,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
 
     @Override
     public void execute(TridentTuple tuple, String s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        String key = this.tupleMapper.getKeyFromTuple(tuple);
         collector.emit(new Values(key, s));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 5b04d59..6b75f31 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
 
-    public RedisStateSetCountQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+    public RedisStateSetCountQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
     }
@@ -48,7 +48,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
         try {
             jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     key = redisKeyPrefix + key;
                 }
@@ -68,7 +68,7 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
 
     @Override
     public void execute(TridentTuple tuple, Long s, TridentCollector collector) {
-        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        String key = this.tupleMapper.getKeyFromTuple(tuple);
         collector.emit(new Values(key, s));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index c36d1f0..d7c43da 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -32,10 +32,10 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
     private final int expireIntervalSec;
 
-    public RedisStateSetUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+    public RedisStateSetUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
         if (expireIntervalSec > 0) {
@@ -53,12 +53,12 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
         try {
             jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 String redisKey = key;
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     redisKey = redisKeyPrefix + redisKey;
                 }
-                String value = this.tupleMapper.getValueFromTridentTuple(input);
+                String value = this.tupleMapper.getValueFromTuple(input);
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 67f7c51..664a222 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
@@ -31,10 +31,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
     private final String redisKeyPrefix;
-    private final TridentTupleMapper tupleMapper;
+    private final TupleMapper tupleMapper;
     private final int expireIntervalSec;
 
-    public RedisStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, int expireIntervalSec) {
+    public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
         this.redisKeyPrefix = redisKeyPrefix;
         this.tupleMapper = tupleMapper;
         if (expireIntervalSec > 0) {
@@ -51,12 +51,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
         try {
             jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String key = this.tupleMapper.getKeyFromTuple(input);
                 String redisKey = key;
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     redisKey = redisKeyPrefix + redisKey;
                 }
-                String value = this.tupleMapper.getValueFromTridentTuple(input);
+                String value = this.tupleMapper.getValueFromTuple(input);
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
deleted file mode 100644
index 355119a..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import com.google.common.base.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfig implements Serializable {
-    private Set<InetSocketAddress> nodes;
-    private int timeout;
-    private int maxRedirections;
-
-    public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
-        this.nodes = nodes;
-        this.timeout = timeout;
-        this.maxRedirections = maxRedirections;
-    }
-
-    public Set<HostAndPort> getNodes() {
-        Set<HostAndPort> ret = new HashSet<HostAndPort>();
-        for (InetSocketAddress node : nodes) {
-            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
-        }
-        return ret;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public int getMaxRedirections() {
-        return maxRedirections;
-    }
-
-    public static class Builder {
-        private Set<InetSocketAddress> nodes;
-        private int timeout = Protocol.DEFAULT_TIMEOUT;
-        private int maxRedirections = 5;
-
-        public Builder setNodes(Set<InetSocketAddress> nodes) {
-            this.nodes = nodes;
-            return this;
-        }
-
-        public Builder setTimeout(int timeout) {
-            this.timeout = timeout;
-            return this;
-        }
-
-        public Builder setMaxRedirections(int maxRedirections) {
-            this.maxRedirections = maxRedirections;
-            return this;
-        }
-
-        public JedisClusterConfig build() {
-            Preconditions.checkNotNull(this.nodes, "Node information should be presented");
-
-            return new JedisClusterConfig(nodes, timeout, maxRedirections);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
deleted file mode 100644
index 9a42cf7..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-
-public class JedisPoolConfig implements Serializable {
-    public static final String DEFAULT_HOST = "127.0.0.1";
-
-    private String host;
-    private int port;
-    private int timeout;
-    private int database;
-    private String password;
-
-    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
-        this.host = host;
-        this.port = port;
-        this.timeout = timeout;
-        this.database = database;
-        this.password = password;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public int getTimeout() {
-        return timeout;
-    }
-
-    public int getDatabase() {
-        return database;
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public static class Builder {
-        private String host = DEFAULT_HOST;
-        private int port = Protocol.DEFAULT_PORT;
-        private int timeout = Protocol.DEFAULT_TIMEOUT;
-        private int database = Protocol.DEFAULT_DATABASE;
-        private String password;
-
-        public Builder setHost(String host) {
-            this.host = host;
-            return this;
-        }
-
-        public Builder setPort(int port) {
-            this.port = port;
-            return this;
-        }
-
-        public Builder setTimeout(int timeout) {
-            this.timeout = timeout;
-            return this;
-        }
-
-        public Builder setDatabase(int database) {
-            this.database = database;
-            return this;
-        }
-
-        public Builder setPassword(String password) {
-            this.password = password;
-            return this;
-        }
-
-        public JedisPoolConfig build() {
-            return new JedisPoolConfig(host, port, timeout, password, database);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
deleted file mode 100644
index 5fd4115..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
-
-import java.io.Closeable;
-
-public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
-
-    private JedisCluster jedisCluster;
-
-    public JedisClusterContainer(JedisCluster jedisCluster) {
-        this.jedisCluster = jedisCluster;
-    }
-
-    @Override
-    public JedisCommands getInstance() {
-        return this.jedisCluster;
-    }
-
-    @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        // do nothing
-    }
-
-    @Override
-    public void close() {
-        this.jedisCluster.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
deleted file mode 100644
index 8d2dd38..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-public class JedisCommandsContainerBuilder {
-
-    public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
-    public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
-        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
-        return new JedisContainer(jedisPool);
-    }
-
-    public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
-        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
-        return new JedisClusterContainer(jedisCluster);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
deleted file mode 100644
index 847d6a5..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCommands;
-
-public interface JedisCommandsInstanceContainer {
-    JedisCommands getInstance();
-    void returnInstance(JedisCommands jedisCommands);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
deleted file mode 100644
index e75cccc..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.JedisPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
-
-    private JedisPool jedisPool;
-
-    public JedisContainer(JedisPool jedisPool) {
-        this.jedisPool = jedisPool;
-    }
-
-    @Override
-    public JedisCommands getInstance() {
-        return jedisPool.getResource();
-    }
-
-    @Override
-    public void returnInstance(JedisCommands jedisCommands) {
-        if (jedisCommands == null) {
-            return;
-        }
-
-        try {
-            ((Closeable) jedisCommands).close();
-        } catch (IOException e) {
-            LOG.warn("Failed to close (return) instance to pool");
-            try {
-                jedisPool.returnBrokenResource((Jedis) jedisCommands);
-            } catch (Exception e2) {
-                LOG.error("Failed to discard instance from pool");
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        jedisPool.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index a62fdff..ae053de 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -20,72 +20,65 @@ package org.apache.storm.redis.topology;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 public class LookupWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String PRINT_BOLT = "PRINT_BOLT";
 
     private static final String TEST_REDIS_HOST = "127.0.0.1";
     private static final int TEST_REDIS_PORT = 6379;
 
-    public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
-        private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
+    public static class PrintWordTotalCountBolt extends BaseRichBolt {
+        private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
         private static final Random RANDOM = new Random();
+        private OutputCollector collector;
 
-        public LookupWordTotalCountBolt(JedisPoolConfig config) {
-            super(config);
-        }
-
-        public LookupWordTotalCountBolt(JedisClusterConfig config) {
-            super(config);
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            this.collector = collector;
         }
 
         @Override
         public void execute(Tuple input) {
-            JedisCommands jedisCommands = null;
-            try {
-                jedisCommands = getInstance();
-                String wordName = input.getStringByField("word");
-                String countStr = jedisCommands.get(wordName);
+            String wordName = input.getStringByField("wordName");
+            String countStr = input.getStringByField("count");
+
+            // print lookup result with low probability
+            if(RANDOM.nextInt(1000) > 995) {
+                int count = 0;
                 if (countStr != null) {
-                    int count = Integer.parseInt(countStr);
-                    this.collector.emit(new Values(wordName, count));
-
-                    // print lookup result with low probability
-                    if(RANDOM.nextInt(1000) > 995) {
-                        LOG.info("Lookup result - word : " + wordName + " / count : " + count);
-                    }
-                } else {
-                    // skip
-                    LOG.warn("Word not found in Redis - word : " + wordName);
+                    count = Integer.parseInt(countStr);
                 }
-            } finally {
-                if (jedisCommands != null) {
-                    returnInstance(jedisCommands);
-                }
-                this.collector.ack(input);
+                LOG.info("Lookup result - word : " + wordName + " / count : " + count);
             }
+
+            collector.ack(input);
         }
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // wordName, count
-            declarer.declare(new Fields("wordName", "count"));
         }
     }
 
@@ -104,12 +97,16 @@ public class LookupWordCount {
                 .setHost(host).setPort(port).build();
 
         WordSpout spout = new WordSpout();
-        LookupWordTotalCountBolt redisLookupBolt = new LookupWordTotalCountBolt(poolConfig);
+        RedisLookupMapper lookupMapper = setupLookupMapper();
+        RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+        PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
 
         //wordspout -> lookupbolt
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(LOOKUP_BOLT, redisLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
 
         if (args.length == 2) {
             LocalCluster cluster = new LocalCluster();
@@ -124,4 +121,46 @@ public class LookupWordCount {
             System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
         }
     }
+
+    private static RedisLookupMapper setupLookupMapper() {
+        return new WordCountRedisLookupMapper();
+    }
+
+    private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountRedisLookupMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public List<Values> toTuple(ITuple input, Object value) {
+            String member = getKeyFromTuple(input);
+            List<Values> values = Lists.newArrayList();
+            values.add(new Values(member, value));
+            return values;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("wordName", "count"));
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 535d7b9..14a969d 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCommands;
@@ -36,7 +40,7 @@ import redis.clients.jedis.exceptions.JedisException;
 public class PersistentWordCount {
     private static final String WORD_SPOUT = "WORD_SPOUT";
     private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String REDIS_BOLT = "REDIS_BOLT";
+    private static final String STORE_BOLT = "STORE_BOLT";
 
     private static final String TEST_REDIS_HOST = "127.0.0.1";
     private static final int TEST_REDIS_PORT = 6379;
@@ -92,14 +96,15 @@ public class PersistentWordCount {
 
         WordSpout spout = new WordSpout();
         WordCounter bolt = new WordCounter();
-        StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig);
+        RedisStoreMapper storeMapper = setupStoreMapper();
+        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
 
         // wordSpout ==> countBolt ==> RedisBolt
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+        builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+        builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
 
         if (args.length == 2) {
             LocalCluster cluster = new LocalCluster();
@@ -114,4 +119,33 @@ public class PersistentWordCount {
             System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
         }
     }
+
+    private static RedisStoreMapper setupStoreMapper() {
+        return new WordCountStoreMapper();
+    }
+
+    private static class WordCountStoreMapper implements RedisStoreMapper {
+        private RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        public WordCountStoreMapper() {
+            description = new RedisDataTypeDescription(
+                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("word");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return tuple.getStringByField("count");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6a0548d..6f25038 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
 
 import java.util.Map;
 
 import static backtype.storm.utils.Utils.tuple;
 
 public class WordCounter implements IBasicBolt {
-
+    private Map<String, Integer> wordCounter = Maps.newHashMap();
 
     @SuppressWarnings("rawtypes")
     public void prepare(Map stormConf, TopologyContext context) {
     }
 
-    /*
-     * Just output the word value with a count of 1.
-     */
     public void execute(Tuple input, BasicOutputCollector collector) {
-        collector.emit(tuple(input.getValues().get(0), 1));
+        String word = input.getStringByField("word");
+        int count;
+        if (wordCounter.containsKey(word)) {
+            count = wordCounter.get(word) + 1;
+            wordCounter.put(word, wordCounter.get(word) + 1);
+        } else {
+            count = 1;
+        }
+
+        wordCounter.put(word, count);
+        collector.emit(new Values(word, String.valueOf(count)));
     }
 
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f7c0bf8a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 9a28cb7..8b6ebc5 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
 import storm.trident.testing.FixedBatchSpout;
 
 public class WordCountTridentRedis {
@@ -48,7 +47,7 @@ public class WordCountTridentRedis {
         JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                                         .setHost(redisHost).setPort(redisPort)
                                         .build();
-        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        TupleMapper tupleMapper = new WordCountTupleMapper();
         RedisState.Factory factory = new RedisState.Factory(poolConfig);
 
         TridentTopology topology = new TridentTopology();