You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:26:22 UTC
[09/23] storm git commit: add test case to storm-redis
add test case to storm-redis
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff36ac87
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff36ac87
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff36ac87
Branch: refs/heads/master
Commit: ff36ac874b03fefefe7d25298238d5c2cc4493d5
Parents: b2c0731
Author: dashengju <da...@qq.com>
Authored: Wed Dec 31 17:11:22 2014 +0800
Committer: dashengju <da...@qq.com>
Committed: Wed Dec 31 17:11:22 2014 +0800
----------------------------------------------------------------------
external/storm-redis/pom.xml | 48 +++++++++
.../redis/trident/state/RedisMapState.java | 28 +++--
.../redis/trident/state/RedisStateQuerier.java | 7 +-
.../state/RedisStateSetCountQuerier.java | 7 +-
.../trident/state/RedisStateSetUpdater.java | 7 +-
.../redis/trident/state/RedisStateUpdater.java | 7 +-
.../redis/util/config/JedisClusterConfig.java | 18 ++--
.../redis/util/container/JedisContainer.java | 4 +
.../storm/redis/topology/LookupWordCount.java | 12 +--
.../redis/topology/PersistentWordCount.java | 2 +-
.../storm/redis/trident/PrintFunction.java | 40 +++++++
.../redis/trident/WordCountTridentRedis.java | 97 +++++++++++++++++
.../trident/WordCountTridentRedisCluster.java | 104 +++++++++++++++++++
.../redis/trident/WordCountTridentRedisMap.java | 95 +++++++++++++++++
.../redis/trident/WordCountTupleMapper.java | 16 +++
15 files changed, 461 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index 46c20a5..6213ae1 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -56,5 +56,53 @@
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
</dependencies>
+
+ <!--
+ <build>
+ <sourceDirectory>src</sourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/multilang</directory>
+ </resource>
+ <resource>
+ <directory>${basedir}/resources</directory>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.2</version>
+ <configuration>
+ <artifactSet>
+ <excludes>
+ <exclude>junit:junit</exclude>
+ <exclude>org.slf4j:slf4j-simple</exclude>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>org.apache.zookeeper</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}-with-dependencies</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ -->
</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 647167e..7aa9dc9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -320,8 +320,9 @@ public class RedisMapState<T> implements IBackingMap<T> {
String[] stringKeys = buildKeys(keys);
List<String> values = Lists.newArrayList();
- JedisCommands jedisCommands = container.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = container.getInstance();
if (jedisCommands instanceof Jedis) {
//Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
values = ((Jedis)jedisCommands).mget(stringKeys);
@@ -332,18 +333,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
}
} finally {
- container.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ container.returnInstance(jedisCommands);
+ }
}
return deserializeValues(keys, values);
} else {
- JedisCommands jedisCommands = container.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = container.getInstance();
Map<String, String> keyValue = jedisCommands.hgetAll(this.options.hkey);
List<String> values = buildValuesFromMap(keys, keyValue);
return deserializeValues(keys, values);
} finally {
- container.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ container.returnInstance(jedisCommands);
+ }
}
}
}
@@ -384,8 +390,9 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
if (Strings.isNullOrEmpty(this.options.hkey)) {
- JedisCommands jedisCommands = container.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = container.getInstance();
if (jedisCommands instanceof Jedis) {
//Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
String[] keyValue = buildKeyValuesList(keys, vals);
@@ -398,18 +405,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
}
} finally {
- container.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ container.returnInstance(jedisCommands);
+ }
}
} else {
- JedisCommands jedisCommands = container.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = container.getInstance();
for (int i = 0; i < keys.size(); i++) {
String val = new String(serializer.serialize(vals.get(i)));
String redisKey = keyFactory.build(keys.get(i));
jedisCommands.hset(this.options.hkey, redisKey, val);
}
} finally {
- container.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ container.returnInstance(jedisCommands);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index e01f8b8..1f6a090 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -47,8 +47,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<String> ret = Lists.newArrayList();
- JedisCommands jedisCommands = redisState.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = redisState.getInstance();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
@@ -60,7 +61,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
logger.debug("redis get key[" + key + "] count[" + value + "]");
}
} finally {
- redisState.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ redisState.returnInstance(jedisCommands);
+ }
}
return ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 506f1b1..42420bc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -44,8 +44,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
public List<Long> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<Long> ret = new ArrayList<Long>();
- JedisCommands jedisCommands = redisState.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = redisState.getInstance();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
@@ -57,7 +58,9 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
logger.debug("redis get key[" + key + "] count[" + count + "]");
}
} finally {
- redisState.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ redisState.returnInstance(jedisCommands);
+ }
}
return ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index 742c9ac..f24caf5 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -52,8 +52,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
TridentCollector collector) {
long expireAt = System.currentTimeMillis() + expireIntervalMs;
- JedisCommands jedisCommands = redisState.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = redisState.getInstance();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
String redisKey = key;
@@ -71,7 +72,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
collector.emit(new Values(key, count));
}
} finally {
- redisState.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ redisState.returnInstance(jedisCommands);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 3a4d42b..a832995 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -51,8 +51,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
TridentCollector collector) {
long expireAt = System.currentTimeMillis() + expireIntervalMs;
- JedisCommands jedisCommands = redisState.getInstance();
+ JedisCommands jedisCommands = null;
try {
+ jedisCommands = redisState.getInstance();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
String redisKey = key;
@@ -67,7 +68,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
jedisCommands.expireAt(redisKey, expireAt);
}
} finally {
- redisState.returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ redisState.returnInstance(jedisCommands);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
index 7d421b2..355119a 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
@@ -22,21 +22,27 @@ import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Protocol;
import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
import java.util.Set;
public class JedisClusterConfig implements Serializable {
- private Set<HostAndPort> nodes;
+ private Set<InetSocketAddress> nodes;
private int timeout;
private int maxRedirections;
- public JedisClusterConfig(Set<HostAndPort> nodes, int timeout, int maxRedirections) {
+ public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
this.nodes = nodes;
this.timeout = timeout;
this.maxRedirections = maxRedirections;
}
public Set<HostAndPort> getNodes() {
- return nodes;
+ Set<HostAndPort> ret = new HashSet<HostAndPort>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
}
public int getTimeout() {
@@ -47,12 +53,12 @@ public class JedisClusterConfig implements Serializable {
return maxRedirections;
}
- static class Builder {
- private Set<HostAndPort> nodes;
+ public static class Builder {
+ private Set<InetSocketAddress> nodes;
private int timeout = Protocol.DEFAULT_TIMEOUT;
private int maxRedirections = 5;
- public Builder setNodes(Set<HostAndPort> nodes) {
+ public Builder setNodes(Set<InetSocketAddress> nodes) {
this.nodes = nodes;
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
index de9f1aa..e75cccc 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
@@ -42,6 +42,10 @@ public class JedisContainer implements JedisCommandsInstanceContainer, Closeable
@Override
public void returnInstance(JedisCommands jedisCommands) {
+ if (jedisCommands == null) {
+ return;
+ }
+
try {
((Closeable) jedisCommands).close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index 42d3800..a62fdff 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -74,14 +74,10 @@ public class LookupWordCount {
// skip
LOG.warn("Word not found in Redis - word : " + wordName);
}
- } catch (NumberFormatException e) {
- LOG.error("Counter Type seems not stored to integer", e);
- } catch (JedisConnectionException e) {
- throw new RuntimeException("Unfortunately, this test requires redis-server running", e);
- } catch (JedisException e) {
- LOG.error("Exception occurred from Jedis/Redis", e);
} finally {
- returnInstance(jedisCommands);
+ if (jedisCommands != null) {
+ returnInstance(jedisCommands);
+ }
this.collector.ack(input);
}
}
@@ -99,7 +95,7 @@ public class LookupWordCount {
String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;
- if (args.length > 2) {
+ if (args.length >= 2) {
host = args[0];
port = Integer.parseInt(args[1]);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index a96696b..535d7b9 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -82,7 +82,7 @@ public class PersistentWordCount {
String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;
- if (args.length > 2) {
+ if (args.length >= 2) {
host = args[0];
port = Integer.parseInt(args[1]);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
new file mode 100644
index 0000000..6f465c9
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.Random;
+
+public class PrintFunction extends BaseFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrintFunction.class);
+
+ private static final Random RANDOM = new Random();
+
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
+ if(RANDOM.nextInt(1000) > 995) {
+ LOG.info(tuple.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
new file mode 100644
index 0000000..9a28cb7
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisPoolConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedis {
+ public static StormTopology buildTopology(String redisHost, Integer redisPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(redisHost).setPort(redisPort)
+ .build();
+ TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ RedisState.Factory factory = new RedisState.Factory(poolConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory,
+ fields,
+ new RedisStateUpdater("test_", tupleMapper, 86400000),
+ new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisStateQuerier("test_", tupleMapper),
+ new Fields("columnName","columnValue"));
+ stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHost = args[1];
+ Integer redisPort = Integer.valueOf(args[2]);
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
new file mode 100644
index 0000000..c4d55dc
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import redis.clients.jedis.HostAndPort;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisCluster {
+ public static StormTopology buildTopology(String redisHostPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+ for (String hostPort : redisHostPort.split(",")) {
+ String[] host_port = hostPort.split(":");
+ nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+ }
+ JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+ .build();
+ TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ RedisState.Factory factory = new RedisState.Factory(clusterConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ stream.partitionPersist(factory,
+ fields,
+ new RedisStateUpdater("test_", tupleMapper, 86400000),
+ new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisStateQuerier("test_", tupleMapper),
+ new Fields("columnName","columnValue"));
+ stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHostPort = args[1];
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
new file mode 100644
index 0000000..b096e55
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisMapState;
+import org.apache.storm.redis.trident.state.RedisState;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.util.config.JedisPoolConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+public class WordCountTridentRedisMap {
+ public static StormTopology buildTopology(String redisHost, Integer redisPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(redisHost).setPort(redisPort)
+ .build();
+ TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ StateFactory factory = RedisMapState.transactional(poolConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 3) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHost = args[1];
+ Integer redisPort = Integer.valueOf(args[2]);
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHost, redisPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ff36ac87/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
new file mode 100644
index 0000000..6454c9e
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
@@ -0,0 +1,16 @@
+package org.apache.storm.redis.trident;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import storm.trident.tuple.TridentTuple;
+
+public class WordCountTupleMapper implements TridentTupleMapper {
+ @Override
+ public String getKeyFromTridentTuple(TridentTuple tuple) {
+ return tuple.getString(0);
+ }
+
+ @Override
+ public String getValueFromTridentTuple(TridentTuple tuple) {
+ return tuple.getInteger(1).toString();
+ }
+}