You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:26:22 UTC

[09/23] storm git commit: add test case to storm-redis

add test case to storm-redis


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff36ac87
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff36ac87
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff36ac87

Branch: refs/heads/master
Commit: ff36ac874b03fefefe7d25298238d5c2cc4493d5
Parents: b2c0731
Author: dashengju <da...@qq.com>
Authored: Wed Dec 31 17:11:22 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Wed Dec 31 17:11:22 2014 +0800

----------------------------------------------------------------------
 external/storm-redis/pom.xml                    |  48 +++++++++
 .../redis/trident/state/RedisMapState.java      |  28 +++--
 .../redis/trident/state/RedisStateQuerier.java  |   7 +-
 .../state/RedisStateSetCountQuerier.java        |   7 +-
 .../trident/state/RedisStateSetUpdater.java     |   7 +-
 .../redis/trident/state/RedisStateUpdater.java  |   7 +-
 .../redis/util/config/JedisClusterConfig.java   |  18 ++--
 .../redis/util/container/JedisContainer.java    |   4 +
 .../storm/redis/topology/LookupWordCount.java   |  12 +--
 .../redis/topology/PersistentWordCount.java     |   2 +-
 .../storm/redis/trident/PrintFunction.java      |  40 +++++++
 .../redis/trident/WordCountTridentRedis.java    |  97 +++++++++++++++++
 .../trident/WordCountTridentRedisCluster.java   | 104 +++++++++++++++++++
 .../redis/trident/WordCountTridentRedisMap.java |  95 +++++++++++++++++
 .../redis/trident/WordCountTupleMapper.java     |  16 +++
 15 files changed, 461 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index 46c20a5..6213ae1 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -56,5 +56,53 @@
             <artifactId>jedis</artifactId>
             <version>${jedis.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
     </dependencies>
+
+    <!--
+    <build>
+        <sourceDirectory>src</sourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/multilang</directory>
+            </resource>
+            <resource>
+                <directory>${basedir}/resources</directory>
+            </resource>
+        </resources>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.2</version>
+                <configuration>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>junit:junit</exclude>
+                            <exclude>org.slf4j:slf4j-simple</exclude>
+                            <exclude>org.slf4j:slf4j-log4j12</exclude>
+                            <exclude>org.apache.zookeeper</exclude>
+                        </excludes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}-${project.version}-with-dependencies</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    -->
 </project>

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 647167e..7aa9dc9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -320,8 +320,9 @@ public class RedisMapState<T> implements IBackingMap<T> {
             String[] stringKeys = buildKeys(keys);
             List<String> values = Lists.newArrayList();
 
-            JedisCommands jedisCommands = container.getInstance();
+            JedisCommands jedisCommands = null;
             try {
+                jedisCommands = container.getInstance();
                 if (jedisCommands instanceof Jedis) {
                     //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
                     values = ((Jedis)jedisCommands).mget(stringKeys);
@@ -332,18 +333,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
                     }
                 }
             } finally {
-                container.returnInstance(jedisCommands);
+                if (jedisCommands != null) {
+                    container.returnInstance(jedisCommands);
+                }
             }
 
             return deserializeValues(keys, values);
         } else {
-            JedisCommands jedisCommands = container.getInstance();
+            JedisCommands jedisCommands = null;
             try {
+                jedisCommands = container.getInstance();
                 Map<String, String> keyValue = jedisCommands.hgetAll(this.options.hkey);
                 List<String> values = buildValuesFromMap(keys, keyValue);
                 return deserializeValues(keys, values);
             } finally {
-                container.returnInstance(jedisCommands);
+                if (jedisCommands != null) {
+                    container.returnInstance(jedisCommands);
+                }
             }
         }
     }
@@ -384,8 +390,9 @@ public class RedisMapState<T> implements IBackingMap<T> {
         }
 
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            JedisCommands jedisCommands = container.getInstance();
+            JedisCommands jedisCommands = null;
             try {
+                jedisCommands = container.getInstance();
                 if (jedisCommands instanceof Jedis) {
                     //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
                     String[] keyValue = buildKeyValuesList(keys, vals);
@@ -398,18 +405,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
                     }
                 }
             } finally {
-                container.returnInstance(jedisCommands);
+                if (jedisCommands != null) {
+                    container.returnInstance(jedisCommands);
+                }
             }
         } else {
-            JedisCommands jedisCommands = container.getInstance();
+            JedisCommands jedisCommands = null;
             try {
+                jedisCommands = container.getInstance();
                 for (int i = 0; i < keys.size(); i++) {
                     String val = new String(serializer.serialize(vals.get(i)));
                     String redisKey = keyFactory.build(keys.get(i));
                     jedisCommands.hset(this.options.hkey, redisKey, val);
                 }
             } finally {
-                container.returnInstance(jedisCommands);
+                if (jedisCommands != null) {
+                    container.returnInstance(jedisCommands);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index e01f8b8..1f6a090 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -47,8 +47,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
         List<String> ret = Lists.newArrayList();
 
-        JedisCommands jedisCommands = redisState.getInstance();
+        JedisCommands jedisCommands = null;
         try {
+            jedisCommands = redisState.getInstance();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
@@ -60,7 +61,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
                 logger.debug("redis get key[" + key + "] count[" + value + "]");
             }
         } finally {
-            redisState.returnInstance(jedisCommands);
+            if (jedisCommands != null) {
+                redisState.returnInstance(jedisCommands);
+            }
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 506f1b1..42420bc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -44,8 +44,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
     public List<Long> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
         List<Long> ret = new ArrayList<Long>();
 
-        JedisCommands jedisCommands = redisState.getInstance();
+        JedisCommands jedisCommands = null;
         try {
+            jedisCommands = redisState.getInstance();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
@@ -57,7 +58,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
                 logger.debug("redis get key[" + key + "] count[" + count + "]");
             }
         } finally {
-            redisState.returnInstance(jedisCommands);
+            if (jedisCommands != null) {
+                redisState.returnInstance(jedisCommands);
+            }
         }
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index 742c9ac..f24caf5 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -52,8 +52,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
                             TridentCollector collector) {
         long expireAt = System.currentTimeMillis() + expireIntervalMs;
 
-        JedisCommands jedisCommands = redisState.getInstance();
+        JedisCommands jedisCommands = null;
         try {
+            jedisCommands = redisState.getInstance();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 String redisKey = key;
@@ -71,7 +72,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
                 collector.emit(new Values(key, count));
             }
         } finally {
-            redisState.returnInstance(jedisCommands);
+            if (jedisCommands != null) {
+                redisState.returnInstance(jedisCommands);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 3a4d42b..a832995 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -51,8 +51,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
                             TridentCollector collector) {
         long expireAt = System.currentTimeMillis() + expireIntervalMs;
 
-        JedisCommands jedisCommands = redisState.getInstance();
+        JedisCommands jedisCommands = null;
         try {
+            jedisCommands = redisState.getInstance();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 String redisKey = key;
@@ -67,7 +68,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
                 jedisCommands.expireAt(redisKey, expireAt);
             }
         } finally {
-            redisState.returnInstance(jedisCommands);
+            if (jedisCommands != null) {
+                redisState.returnInstance(jedisCommands);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
index 7d421b2..355119a 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
@@ -22,21 +22,27 @@ import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.Protocol;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
 import java.util.Set;
 
 public class JedisClusterConfig implements Serializable {
-    private Set<HostAndPort> nodes;
+    private Set<InetSocketAddress> nodes;
     private int timeout;
     private int maxRedirections;
 
-    public JedisClusterConfig(Set<HostAndPort> nodes, int timeout, int maxRedirections) {
+    public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
         this.nodes = nodes;
         this.timeout = timeout;
         this.maxRedirections = maxRedirections;
     }
 
     public Set<HostAndPort> getNodes() {
-        return nodes;
+        Set<HostAndPort> ret = new HashSet<HostAndPort>();
+        for (InetSocketAddress node : nodes) {
+            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+        }
+        return ret;
     }
 
     public int getTimeout() {
@@ -47,12 +53,12 @@ public class JedisClusterConfig implements Serializable {
         return maxRedirections;
     }
 
-    static class Builder {
-        private Set<HostAndPort> nodes;
+    public static class Builder {
+        private Set<InetSocketAddress> nodes;
         private int timeout = Protocol.DEFAULT_TIMEOUT;
         private int maxRedirections = 5;
 
-        public Builder setNodes(Set<HostAndPort> nodes) {
+        public Builder setNodes(Set<InetSocketAddress> nodes) {
             this.nodes = nodes;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
index de9f1aa..e75cccc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
@@ -42,6 +42,10 @@ public class JedisContainer implements JedisCommandsInstanceContainer, Closeable
 
     @Override
     public void returnInstance(JedisCommands jedisCommands) {
+        if (jedisCommands == null) {
+            return;
+        }
+
         try {
             ((Closeable) jedisCommands).close();
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index 42d3800..a62fdff 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -74,14 +74,10 @@ public class LookupWordCount {
                     // skip
                     LOG.warn("Word not found in Redis - word : " + wordName);
                 }
-            } catch (NumberFormatException e) {
-                LOG.error("Counter Type seems not stored to integer", e);
-            } catch (JedisConnectionException e) {
-                throw new RuntimeException("Unfortunately, this test requires redis-server running", e);
-            } catch (JedisException e) {
-                LOG.error("Exception occurred from Jedis/Redis", e);
             } finally {
-                returnInstance(jedisCommands);
+                if (jedisCommands != null) {
+                    returnInstance(jedisCommands);
+                }
                 this.collector.ack(input);
             }
         }
@@ -99,7 +95,7 @@ public class LookupWordCount {
         String host = TEST_REDIS_HOST;
         int port = TEST_REDIS_PORT;
 
-        if (args.length > 2) {
+        if (args.length >= 2) {
             host = args[0];
             port = Integer.parseInt(args[1]);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index a96696b..535d7b9 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -82,7 +82,7 @@ public class PersistentWordCount {
         String host = TEST_REDIS_HOST;
         int port = TEST_REDIS_PORT;
 
-        if (args.length > 2) {
+        if (args.length >= 2) {
             host = args[0];
             port = Integer.parseInt(args[1]);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
new file mode 100644
index 0000000..6f465c9
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+    private static final Random RANDOM = new Random();
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+        if(RANDOM.nextInt(1000) > 995) {
+            LOG.info(tuple.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
new file mode 100644
index 0000000..9a28cb7
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisPoolConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedis {
+    public static StormTopology buildTopology(String redisHost, Integer redisPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                                        .setHost(redisHost).setPort(redisPort)
+                                        .build();
+        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        RedisState.Factory factory = new RedisState.Factory(poolConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new RedisStateUpdater("test_", tupleMapper, 86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisStateQuerier("test_", tupleMapper),
+                                new Fields("columnName","columnValue"));
+        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 3) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHost = args[1];
+        Integer redisPort = Integer.valueOf(args[2]);
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
new file mode 100644
index 0000000..c4d55dc
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import redis.clients.jedis.HostAndPort;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisCluster {
+    public static StormTopology buildTopology(String redisHostPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+        for (String hostPort : redisHostPort.split(",")) {
+            String[] host_port = hostPort.split(":");
+            nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+        }
+        JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+                                        .build();
+        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        RedisState.Factory factory = new RedisState.Factory(clusterConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        stream.partitionPersist(factory,
+                                fields,
+                                new RedisStateUpdater("test_", tupleMapper, 86400000),
+                                new Fields());
+
+        TridentState state = topology.newStaticState(factory);
+        stream = stream.stateQuery(state, new Fields("word"),
+                                new RedisStateQuerier("test_", tupleMapper),
+                                new Fields("columnName","columnValue"));
+        stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHostPort = args[1];
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
new file mode 100644
index 0000000..b096e55
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisPoolConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedisMap {
+    public static StormTopology buildTopology(String redisHost, Integer redisPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                                        .setHost(redisHost).setPort(redisPort)
+                                        .build();
+        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        StateFactory factory = RedisMapState.transactional(poolConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 3) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHost = args[1];
+        Integer redisPort = Integer.valueOf(args[2]);
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
new file mode 100644
index 0000000..6454c9e
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
@@ -0,0 +1,16 @@
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import storm.trident.tuple.TridentTuple;
+
+public class WordCountTupleMapper implements TridentTupleMapper {
+    @Override
+    public String getKeyFromTridentTuple(TridentTuple tuple) {
+        return tuple.getString(0);
+    }
+
+    @Override
+    public String getValueFromTridentTuple(TridentTuple tuple) {
+        return tuple.getInteger(1).toString();
+    }
+}