You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 22:41:00 UTC
[1/3] storm git commit: sync storm-redis with master
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch 511869c6f -> 81505f9c7
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 535d7b9..77c6ee8 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -23,10 +23,14 @@ import backtype.storm.StormSubmitter;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
@@ -36,46 +40,11 @@ import redis.clients.jedis.exceptions.JedisException;
public class PersistentWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
- private static final String REDIS_BOLT = "REDIS_BOLT";
+ private static final String STORE_BOLT = "STORE_BOLT";
private static final String TEST_REDIS_HOST = "127.0.0.1";
private static final int TEST_REDIS_PORT = 6379;
- public static class StoreCountRedisBolt extends AbstractRedisBolt {
- private static final Logger LOG = LoggerFactory.getLogger(StoreCountRedisBolt.class);
-
- public StoreCountRedisBolt(JedisPoolConfig config) {
- super(config);
- }
-
- public StoreCountRedisBolt(JedisClusterConfig config) {
- super(config);
- }
-
- @Override
- public void execute(Tuple input) {
- String word = input.getStringByField("word");
- int count = input.getIntegerByField("count");
-
- JedisCommands commands = null;
- try {
- commands = getInstance();
- commands.incrBy(word, count);
- } catch (JedisConnectionException e) {
- throw new RuntimeException("Unfortunately, this test requires redis-server running", e);
- } catch (JedisException e) {
- LOG.error("Exception occurred from Jedis/Redis", e);
- } finally {
- returnInstance(commands);
- this.collector.ack(input);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- }
-
public static void main(String[] args) throws Exception {
Config config = new Config();
@@ -92,14 +61,15 @@ public class PersistentWordCount {
WordSpout spout = new WordSpout();
WordCounter bolt = new WordCounter();
- StoreCountRedisBolt redisBolt = new StoreCountRedisBolt(poolConfig);
+ RedisStoreMapper storeMapper = setupStoreMapper();
+ RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
// wordSpout ==> countBolt ==> RedisBolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
- builder.setBolt(REDIS_BOLT, redisBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+ builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
+ builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
if (args.length == 2) {
LocalCluster cluster = new LocalCluster();
@@ -114,4 +84,33 @@ public class PersistentWordCount {
System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
}
}
+
+ private static RedisStoreMapper setupStoreMapper() {
+ return new WordCountStoreMapper();
+ }
+
+ private static class WordCountStoreMapper implements RedisStoreMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountStoreMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getStringByField("count");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6a0548d..6f25038 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -23,23 +23,32 @@ import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Maps;
import java.util.Map;
import static backtype.storm.utils.Utils.tuple;
public class WordCounter implements IBasicBolt {
-
+ private Map<String, Integer> wordCounter = Maps.newHashMap();
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context) {
}
- /*
- * Just output the word value with a count of 1.
- */
public void execute(Tuple input, BasicOutputCollector collector) {
- collector.emit(tuple(input.getValues().get(0), 1));
+ String word = input.getStringByField("word");
+ int count;
+ if (wordCounter.containsKey(word)) {
+ count = wordCounter.get(word) + 1;
+ wordCounter.put(word, wordCounter.get(word) + 1);
+ } else {
+ count = 1;
+ }
+
+ wordCounter.put(word, count);
+ collector.emit(new Values(word, String.valueOf(count)));
}
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index a610f54..eb13399 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,15 +23,14 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
-import storm.trident.state.StateFactory;
import storm.trident.testing.FixedBatchSpout;
public class WordCountTridentRedis {
@@ -48,7 +47,7 @@ public class WordCountTridentRedis {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ TupleMapper tupleMapper = new WordCountTupleMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 8bea3ce..8562e77 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -23,11 +23,11 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisClusterState;
import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
@@ -55,7 +55,7 @@ public class WordCountTridentRedisCluster {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ TupleMapper tupleMapper = new WordCountTupleMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index e9ae54d..de1f252 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,11 +23,9 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisClusterMapState;
-import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
@@ -58,7 +56,7 @@ public class WordCountTridentRedisClusterMap {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ TupleMapper tupleMapper = new WordCountTupleMapper();
StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index b096e55..4d4afe8 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,12 +23,9 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisMapState;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
@@ -51,7 +48,7 @@ public class WordCountTridentRedisMap {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ TupleMapper tupleMapper = new WordCountTupleMapper();
StateFactory factory = RedisMapState.transactional(poolConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
index 6454c9e..1e601c9 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
@@ -1,16 +1,16 @@
package org.apache.storm.redis.trident;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
-import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.TupleMapper;
-public class WordCountTupleMapper implements TridentTupleMapper {
+public class WordCountTupleMapper implements TupleMapper {
@Override
- public String getKeyFromTridentTuple(TridentTuple tuple) {
+ public String getKeyFromTuple(ITuple tuple) {
return tuple.getString(0);
}
@Override
- public String getValueFromTridentTuple(TridentTuple tuple) {
+ public String getValueFromTuple(ITuple tuple) {
return tuple.getInteger(1).toString();
}
}
[3/3] storm git commit: update changelog for storm-redis changes
merged from master
Posted by pt...@apache.org.
update changelog for storm-redis changes merged from master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81505f9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81505f9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81505f9c
Branch: refs/heads/0.10.x-branch
Commit: 81505f9c71a63b1d8af9825fc88bb8b59389f8a2
Parents: 0edb8ab
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 16:40:44 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 16:40:44 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/81505f9c/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc49984..9109f01 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
## 0.10.0
+ * STORM-735: [storm-redis] Upgrade Jedis to 2.7.0
+ * STORM-724: Document RedisStoreBolt and RedisLookupBolt which is missed.
+ * STORM-703: With hash key option for RedisMapState, only get values for keys in batch
* STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.
* STORM-737] Check task->node+port with read lock to prevent sending to closed connection
* STORM-835 Netty Client hold batch object until io operation complete
[2/3] storm git commit: sync storm-redis with master
Posted by pt...@apache.org.
sync storm-redis with master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0edb8ab9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0edb8ab9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0edb8ab9
Branch: refs/heads/0.10.x-branch
Commit: 0edb8ab90c6edbfaf46f05be5dc51c1a3c5b3d03
Parents: 511869c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 16:33:11 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 16:33:11 2015 -0400
----------------------------------------------------------------------
external/storm-redis/README.md | 108 ++++++++++++++++-
external/storm-redis/pom.xml | 4 +-
.../storm/redis/bolt/AbstractRedisBolt.java | 8 +-
.../storm/redis/bolt/RedisLookupBolt.java | 112 ++++++++++++++++++
.../apache/storm/redis/bolt/RedisStoreBolt.java | 100 ++++++++++++++++
.../redis/common/config/JedisClusterConfig.java | 82 +++++++++++++
.../redis/common/config/JedisPoolConfig.java | 97 ++++++++++++++++
.../common/container/JedisClusterContainer.java | 47 ++++++++
.../JedisCommandsContainerBuilder.java | 38 ++++++
.../JedisCommandsInstanceContainer.java | 25 ++++
.../redis/common/container/JedisContainer.java | 60 ++++++++++
.../common/mapper/RedisDataTypeDescription.java | 50 ++++++++
.../redis/common/mapper/RedisLookupMapper.java | 40 +++++++
.../storm/redis/common/mapper/RedisMapper.java | 22 ++++
.../redis/common/mapper/RedisStoreMapper.java | 21 ++++
.../storm/redis/common/mapper/TupleMapper.java | 27 +++++
.../trident/mapper/TridentTupleMapper.java | 27 -----
.../trident/state/RedisClusterMapState.java | 2 +-
.../redis/trident/state/RedisClusterState.java | 2 +-
.../trident/state/RedisClusterStateQuerier.java | 10 +-
.../trident/state/RedisClusterStateUpdater.java | 10 +-
.../redis/trident/state/RedisMapState.java | 31 ++---
.../storm/redis/trident/state/RedisState.java | 2 +-
.../redis/trident/state/RedisStateQuerier.java | 10 +-
.../redis/trident/state/RedisStateUpdater.java | 10 +-
.../redis/util/config/JedisClusterConfig.java | 82 -------------
.../redis/util/config/JedisPoolConfig.java | 97 ----------------
.../util/container/JedisClusterContainer.java | 47 --------
.../JedisCommandsContainerBuilder.java | 38 ------
.../JedisCommandsInstanceContainer.java | 25 ----
.../redis/util/container/JedisContainer.java | 65 -----------
.../storm/redis/topology/LookupWordCount.java | 115 +++++++++++++------
.../redis/topology/PersistentWordCount.java | 81 +++++++------
.../storm/redis/topology/WordCounter.java | 19 ++-
.../redis/trident/WordCountTridentRedis.java | 7 +-
.../trident/WordCountTridentRedisCluster.java | 6 +-
.../WordCountTridentRedisClusterMap.java | 8 +-
.../redis/trident/WordCountTridentRedisMap.java | 9 +-
.../redis/trident/WordCountTupleMapper.java | 10 +-
39 files changed, 1016 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 17b5904..f165c09 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -2,6 +2,8 @@
Storm/Trident integration for [Redis](http://redis.io/)
+Storm-redis uses Jedis for Redis client.
+
## Usage
### How do I use it?
@@ -12,12 +14,114 @@ use it as a maven dependency:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
- <version>0.10.0</version>
+ <version>{storm.version}</version>
<type>jar</type>
</dependency>
```
-### AbstractRedisBolt usage:
+### For normal Bolt
+
+Storm-redis provides basic Bolt implementations, ```RedisLookupBolt``` and ```RedisStoreBolt```.
+
+As name represents its usage, ```RedisLookupBolt``` retrieves value from Redis using key, and ```RedisStoreBolt``` stores key / value to Redis. One tuple will be matched to one key / value pair, and you can define match pattern to ```TupleMapper```.
+
+You can also choose data type from ```RedisDataTypeDescription``` to use. Please refer ```RedisDataTypeDescription.RedisDataType``` to see what data types are supported. In some data types (hash and sorted set), it requires additional key and converted key from tuple becomes element.
+
+These interfaces are combined with ```RedisLookupMapper``` and ```RedisStoreMapper``` which fit ```RedisLookupBolt``` and ```RedisStoreBolt``` respectively.
+
+#### RedisLookupBolt example
+
+```java
+
+class WordCountRedisLookupMapper implements RedisLookupMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountRedisLookupMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ String member = getKeyFromTuple(input);
+ List<Values> values = Lists.newArrayList();
+ values.add(new Values(member, value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("wordName", "count"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return null;
+ }
+}
+
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(host).setPort(port).build();
+RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
+RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+```
+
+#### RedisStoreBolt example
+
+```java
+
+class WordCountStoreMapper implements RedisStoreMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountStoreMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getStringByField("count");
+ }
+}
+```
+
+```java
+
+JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+ .setHost(host).setPort(port).build();
+RedisStoreMapper storeMapper = new WordCountStoreMapper();
+RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
+```
+
+### For non-simple Bolt
+
+If your scenario doesn't fit ```RedisStoreBolt``` and ```RedisLookupBolt```, storm-redis also provides ```AbstractRedisBolt``` to let you extend and apply your business logic.
```java
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml
index e4d7daa..9033998 100644
--- a/external/storm-redis/pom.xml
+++ b/external/storm-redis/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.10.0-SNAPSHOT</version>
+ <version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -41,7 +41,7 @@
</developers>
<properties>
- <jedis.version>2.6.2</jedis.version>
+ <jedis.version>2.7.0</jedis.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index 0b2a7f3..158fcaa 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -20,10 +20,10 @@ package org.apache.storm.redis.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
+import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import redis.clients.jedis.JedisCommands;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
new file mode 100644
index 0000000..bbd7e6a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCommands;
+
+import java.util.List;
+
+public class RedisLookupBolt extends AbstractRedisBolt {
+ private final RedisLookupMapper lookupMapper;
+ private final RedisDataTypeDescription.RedisDataType dataType;
+ private final String additionalKey;
+
+ public RedisLookupBolt(JedisPoolConfig config, RedisLookupMapper lookupMapper) {
+ super(config);
+
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ public RedisLookupBolt(JedisClusterConfig config, RedisLookupMapper lookupMapper) {
+ super(config);
+
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String key = lookupMapper.getKeyFromTuple(input);
+ Object lookupValue = null;
+
+ JedisCommands jedisCommand = null;
+ try {
+ jedisCommand = getInstance();
+
+ switch (dataType) {
+ case STRING:
+ lookupValue = jedisCommand.get(key);
+ break;
+
+ case LIST:
+ lookupValue = jedisCommand.lpop(key);
+ break;
+
+ case HASH:
+ lookupValue = jedisCommand.hget(additionalKey, key);
+ break;
+
+ case SET:
+ lookupValue = jedisCommand.scard(key);
+ break;
+
+ case SORTED_SET:
+ lookupValue = jedisCommand.zscore(additionalKey, key);
+ break;
+
+ case HYPER_LOG_LOG:
+ lookupValue = jedisCommand.pfcount(key);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+ }
+
+ List<Values> values = lookupMapper.toTuple(input, lookupValue);
+ for (Values value : values) {
+ collector.emit(input, value);
+ }
+
+ collector.ack(input);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(input);
+ } finally {
+ returnInstance(jedisCommand);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ lookupMapper.declareOutputFields(declarer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
new file mode 100644
index 0000000..761c5ed
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import redis.clients.jedis.JedisCommands;
+
+public class RedisStoreBolt extends AbstractRedisBolt {
+ private final RedisStoreMapper storeMapper;
+ private final RedisDataTypeDescription.RedisDataType dataType;
+ private final String additionalKey;
+
+ public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
+ super(config);
+ this.storeMapper = storeMapper;
+
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
+ super(config);
+ this.storeMapper = storeMapper;
+
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
+
+ JedisCommands jedisCommand = null;
+ try {
+ jedisCommand = getInstance();
+
+ switch (dataType) {
+ case STRING:
+ jedisCommand.set(key, value);
+ break;
+
+ case LIST:
+ jedisCommand.rpush(key, value);
+ break;
+
+ case HASH:
+ jedisCommand.hset(additionalKey, key, value);
+ break;
+
+ case SET:
+ jedisCommand.sadd(key, value);
+ break;
+
+ case SORTED_SET:
+ jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
+
+ case HYPER_LOG_LOG:
+ jedisCommand.pfadd(key, value);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+ }
+
+ collector.ack(input);
+ } catch (Exception e) {
+ this.collector.reportError(e);
+ this.collector.fail(input);
+ } finally {
+ returnInstance(jedisCommand);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
new file mode 100644
index 0000000..a13eced
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisClusterConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfig implements Serializable {
+ private Set<InetSocketAddress> nodes;
+ private int timeout;
+ private int maxRedirections;
+
+ public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
+ this.nodes = nodes;
+ this.timeout = timeout;
+ this.maxRedirections = maxRedirections;
+ }
+
+ public Set<HostAndPort> getNodes() {
+ Set<HostAndPort> ret = new HashSet<HostAndPort>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public int getMaxRedirections() {
+ return maxRedirections;
+ }
+
+ public static class Builder {
+ private Set<InetSocketAddress> nodes;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int maxRedirections = 5;
+
+ public Builder setNodes(Set<InetSocketAddress> nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder setMaxRedirections(int maxRedirections) {
+ this.maxRedirections = maxRedirections;
+ return this;
+ }
+
+ public JedisClusterConfig build() {
+ Preconditions.checkNotNull(this.nodes, "Node information should be presented");
+
+ return new JedisClusterConfig(nodes, timeout, maxRedirections);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
new file mode 100644
index 0000000..cc5f6e4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/config/JedisPoolConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.config;
+
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+public class JedisPoolConfig implements Serializable {
+ public static final String DEFAULT_HOST = "127.0.0.1";
+
+ private String host;
+ private int port;
+ private int timeout;
+ private int database;
+ private String password;
+
+ public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
+ this.host = host;
+ this.port = port;
+ this.timeout = timeout;
+ this.database = database;
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public int getDatabase() {
+ return database;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public static class Builder {
+ private String host = DEFAULT_HOST;
+ private int port = Protocol.DEFAULT_PORT;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private String password;
+
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public JedisPoolConfig build() {
+ return new JedisPoolConfig(host, port, timeout, password, database);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
new file mode 100644
index 0000000..a1ff19f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisClusterContainer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisCommands;
+
+import java.io.Closeable;
+
+public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
+
+ private JedisCluster jedisCluster;
+
+ public JedisClusterContainer(JedisCluster jedisCluster) {
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public JedisCommands getInstance() {
+ return this.jedisCluster;
+ }
+
+ @Override
+ public void returnInstance(JedisCommands jedisCommands) {
+ // do nothing
+ }
+
+ @Override
+ public void close() {
+ this.jedisCluster.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..a2f8c2e
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsContainerBuilder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+
+public class JedisCommandsContainerBuilder {
+
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+ public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
+ JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
+ return new JedisContainer(jedisPool);
+ }
+
+ public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
+ JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
+ return new JedisClusterContainer(jedisCluster);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
new file mode 100644
index 0000000..9ec32b9
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisCommandsInstanceContainer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import redis.clients.jedis.JedisCommands;
+
+public interface JedisCommandsInstanceContainer {
+ JedisCommands getInstance();
+ void returnInstance(JedisCommands jedisCommands);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
new file mode 100644
index 0000000..8e1fdf5
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/container/JedisContainer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
+
+ private JedisPool jedisPool;
+
+ public JedisContainer(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
+ }
+
+ @Override
+ public JedisCommands getInstance() {
+ return jedisPool.getResource();
+ }
+
+ @Override
+ public void returnInstance(JedisCommands jedisCommands) {
+ if (jedisCommands == null) {
+ return;
+ }
+
+ try {
+ ((Closeable) jedisCommands).close();
+ } catch (IOException e) {
+ LOG.error("Failed to close (return) instance to pool");
+ }
+ }
+
+ @Override
+ public void close() {
+ jedisPool.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
new file mode 100644
index 0000000..ba981d0
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisDataTypeDescription.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Serializable {
+ public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG }
+
+ private RedisDataType dataType;
+ private String additionalKey;
+
+ public RedisDataTypeDescription(RedisDataType dataType) {
+ this(dataType, null);
+ }
+
+ public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) {
+ this.dataType = dataType;
+ this.additionalKey = additionalKey;
+
+ if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) {
+ if (additionalKey == null) {
+ throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+ }
+ }
+ }
+
+ public RedisDataType getDataType() {
+ return dataType;
+ }
+
+ public String getAdditionalKey() {
+ return additionalKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
new file mode 100644
index 0000000..880aea1
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+
+import java.util.List;
+
+public interface RedisLookupMapper extends TupleMapper, RedisMapper {
+ /**
+ * Converts return value from Redis to a list of storm values that can be emitted.
+ * @param input the input tuple.
+ * @param value Redis query response value. Can be String, Boolean, Long regarding of data type.
+ * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+ */
+ public List<Values> toTuple(ITuple input, Object value);
+
+ /**
+ * declare what are the fields that this code will output.
+ * @param declarer
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..416ce5f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisMapper {
+ public RedisDataTypeDescription getDataTypeDescription();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
new file mode 100644
index 0000000..b3d7adf
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisStoreMapper.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+public interface RedisStoreMapper extends TupleMapper, RedisMapper {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
new file mode 100644
index 0000000..86664b8
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.common.mapper;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+public interface TupleMapper extends Serializable {
+ public String getKeyFromTuple(ITuple tuple);
+ public String getValueFromTuple(ITuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
deleted file mode 100644
index 4c10143..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/mapper/TridentTupleMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.trident.mapper;
-
-import storm.trident.tuple.TridentTuple;
-
-import java.io.Serializable;
-
-public interface TridentTupleMapper extends Serializable {
- public String getKeyFromTridentTuple(TridentTuple tuple);
- public String getValueFromTridentTuple(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 24c1df1..1154376 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 493ffdd..d74e838 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index e0207e2..17614a1 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
@@ -33,9 +33,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
- public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -52,7 +52,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
@@ -72,7 +72,7 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 8512888..e00cfb6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
@@ -31,10 +31,10 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
private int expireIntervalSec = 0;
- public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -57,12 +57,12 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
try {
jedisCluster = redisClusterState.getJedisCluster();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
- String value = this.tupleMapper.getValueFromTridentTuple(input);
+ String value = this.tupleMapper.getValueFromTuple(input);
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index f934cea..82ca8bb 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -22,7 +22,7 @@ import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -223,8 +223,10 @@ public class RedisMapState<T> implements IBackingMap<T> {
if (keys.size() == 0) {
return Collections.emptyList();
}
+
+ String[] stringKeys = buildKeys(keys);
+
if (Strings.isNullOrEmpty(this.options.hkey)) {
- String[] stringKeys = buildKeys(keys);
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
@@ -232,34 +234,23 @@ public class RedisMapState<T> implements IBackingMap<T> {
return deserializeValues(keys, values);
} finally {
if (jedis != null) {
- jedisPool.returnResource(jedis);
+ jedis.close();
}
}
} else {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
- Map<String, String> keyValue = jedis.hgetAll(this.options.hkey);
- List<String> values = buildValuesFromMap(keys, keyValue);
+ List<String> values = jedis.hmget(this.options.hkey, stringKeys);
return deserializeValues(keys, values);
} finally {
if (jedis != null) {
- jedisPool.returnResource(jedis);
+ jedis.close();
}
}
}
}
- private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
- List<String> values = new ArrayList<String>(keys.size());
- for (List<Object> key : keys) {
- String strKey = keyFactory.build(key);
- String value = keyValue.get(strKey);
- values.add(value);
- }
- return values;
- }
-
private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
List<T> result = new ArrayList<T>(keys.size());
for (String value : values) {
@@ -293,7 +284,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
jedis.mset(keyValue);
} finally {
if (jedis != null) {
- jedisPool.returnResource(jedis);
+ jedis.close();
}
}
} else {
@@ -303,11 +294,13 @@ public class RedisMapState<T> implements IBackingMap<T> {
for (int i = 0; i < keys.size(); i++) {
String val = new String(serializer.serialize(vals.get(i)));
keyValues.put(keyFactory.build(keys.get(i)), val);
- }
+ }
jedis.hmset(this.options.hkey, keyValues);
} finally {
- jedisPool.returnResource(jedis);
+ if (jedis != null) {
+ jedis.close();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index 72b1267..3441936 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,7 +18,7 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 051088e..294e83b 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,7 +19,7 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -33,9 +33,9 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
- public RedisStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -44,7 +44,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<String> keys = Lists.newArrayList();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
@@ -64,7 +64,7 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index c53476f..2939d3d 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -31,10 +31,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
private final String redisKeyPrefix;
- private final TridentTupleMapper tupleMapper;
+ private final TupleMapper tupleMapper;
private int expireIntervalSec = 0;
- public RedisStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
}
@@ -56,12 +56,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
- String value = this.tupleMapper.getValueFromTridentTuple(input);
+ String value = this.tupleMapper.getValueFromTuple(input);
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
deleted file mode 100644
index 355119a..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisClusterConfig.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import com.google.common.base.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfig implements Serializable {
- private Set<InetSocketAddress> nodes;
- private int timeout;
- private int maxRedirections;
-
- public JedisClusterConfig(Set<InetSocketAddress> nodes, int timeout, int maxRedirections) {
- this.nodes = nodes;
- this.timeout = timeout;
- this.maxRedirections = maxRedirections;
- }
-
- public Set<HostAndPort> getNodes() {
- Set<HostAndPort> ret = new HashSet<HostAndPort>();
- for (InetSocketAddress node : nodes) {
- ret.add(new HostAndPort(node.getHostName(), node.getPort()));
- }
- return ret;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public int getMaxRedirections() {
- return maxRedirections;
- }
-
- public static class Builder {
- private Set<InetSocketAddress> nodes;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int maxRedirections = 5;
-
- public Builder setNodes(Set<InetSocketAddress> nodes) {
- this.nodes = nodes;
- return this;
- }
-
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- public Builder setMaxRedirections(int maxRedirections) {
- this.maxRedirections = maxRedirections;
- return this;
- }
-
- public JedisClusterConfig build() {
- Preconditions.checkNotNull(this.nodes, "Node information should be presented");
-
- return new JedisClusterConfig(nodes, timeout, maxRedirections);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
deleted file mode 100644
index 9a42cf7..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/config/JedisPoolConfig.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.config;
-
-import redis.clients.jedis.Protocol;
-
-import java.io.Serializable;
-
-public class JedisPoolConfig implements Serializable {
- public static final String DEFAULT_HOST = "127.0.0.1";
-
- private String host;
- private int port;
- private int timeout;
- private int database;
- private String password;
-
- public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- this.database = database;
- this.password = password;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public int getDatabase() {
- return database;
- }
-
- public String getPassword() {
- return password;
- }
-
- public static class Builder {
- private String host = DEFAULT_HOST;
- private int port = Protocol.DEFAULT_PORT;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int database = Protocol.DEFAULT_DATABASE;
- private String password;
-
- public Builder setHost(String host) {
- this.host = host;
- return this;
- }
-
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- public Builder setDatabase(int database) {
- this.database = database;
- return this;
- }
-
- public Builder setPassword(String password) {
- this.password = password;
- return this;
- }
-
- public JedisPoolConfig build() {
- return new JedisPoolConfig(host, port, timeout, password, database);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
deleted file mode 100644
index 5fd4115..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisClusterContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisCommands;
-
-import java.io.Closeable;
-
-public class JedisClusterContainer implements JedisCommandsInstanceContainer, Closeable {
-
- private JedisCluster jedisCluster;
-
- public JedisClusterContainer(JedisCluster jedisCluster) {
- this.jedisCluster = jedisCluster;
- }
-
- @Override
- public JedisCommands getInstance() {
- return this.jedisCluster;
- }
-
- @Override
- public void returnInstance(JedisCommands jedisCommands) {
- // do nothing
- }
-
- @Override
- public void close() {
- this.jedisCluster.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
deleted file mode 100644
index 8d2dd38..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-
-public class JedisCommandsContainerBuilder {
-
- public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
-
- public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
- JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
- return new JedisContainer(jedisPool);
- }
-
- public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
- JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getMaxRedirections(), DEFAULT_POOL_CONFIG);
- return new JedisClusterContainer(jedisCluster);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
deleted file mode 100644
index 847d6a5..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisCommandsInstanceContainer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import redis.clients.jedis.JedisCommands;
-
-public interface JedisCommandsInstanceContainer {
- JedisCommands getInstance();
- void returnInstance(JedisCommands jedisCommands);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java b/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
deleted file mode 100644
index e75cccc..0000000
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/util/container/JedisContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.redis.util.container;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.JedisPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class JedisContainer implements JedisCommandsInstanceContainer, Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(JedisContainer.class);
-
- private JedisPool jedisPool;
-
- public JedisContainer(JedisPool jedisPool) {
- this.jedisPool = jedisPool;
- }
-
- @Override
- public JedisCommands getInstance() {
- return jedisPool.getResource();
- }
-
- @Override
- public void returnInstance(JedisCommands jedisCommands) {
- if (jedisCommands == null) {
- return;
- }
-
- try {
- ((Closeable) jedisCommands).close();
- } catch (IOException e) {
- LOG.warn("Failed to close (return) instance to pool");
- try {
- jedisPool.returnBrokenResource((Jedis) jedisCommands);
- } catch (Exception e2) {
- LOG.error("Failed to discard instance from pool");
- }
- }
- }
-
- @Override
- public void close() {
- jedisPool.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/0edb8ab9/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index a62fdff..ae053de 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -20,72 +20,65 @@ package org.apache.storm.redis.topology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
-import org.apache.storm.redis.util.config.JedisPoolConfig;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
public class LookupWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String PRINT_BOLT = "PRINT_BOLT";
private static final String TEST_REDIS_HOST = "127.0.0.1";
private static final int TEST_REDIS_PORT = 6379;
- public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
- private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
+ public static class PrintWordTotalCountBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(PrintWordTotalCountBolt.class);
private static final Random RANDOM = new Random();
+ private OutputCollector collector;
- public LookupWordTotalCountBolt(JedisPoolConfig config) {
- super(config);
- }
-
- public LookupWordTotalCountBolt(JedisClusterConfig config) {
- super(config);
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
}
@Override
public void execute(Tuple input) {
- JedisCommands jedisCommands = null;
- try {
- jedisCommands = getInstance();
- String wordName = input.getStringByField("word");
- String countStr = jedisCommands.get(wordName);
+ String wordName = input.getStringByField("wordName");
+ String countStr = input.getStringByField("count");
+
+ // print lookup result with low probability
+ if(RANDOM.nextInt(1000) > 995) {
+ int count = 0;
if (countStr != null) {
- int count = Integer.parseInt(countStr);
- this.collector.emit(new Values(wordName, count));
-
- // print lookup result with low probability
- if(RANDOM.nextInt(1000) > 995) {
- LOG.info("Lookup result - word : " + wordName + " / count : " + count);
- }
- } else {
- // skip
- LOG.warn("Word not found in Redis - word : " + wordName);
+ count = Integer.parseInt(countStr);
}
- } finally {
- if (jedisCommands != null) {
- returnInstance(jedisCommands);
- }
- this.collector.ack(input);
+ LOG.info("Lookup result - word : " + wordName + " / count : " + count);
}
+
+ collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // wordName, count
- declarer.declare(new Fields("wordName", "count"));
}
}
@@ -104,12 +97,16 @@ public class LookupWordCount {
.setHost(host).setPort(port).build();
WordSpout spout = new WordSpout();
- LookupWordTotalCountBolt redisLookupBolt = new LookupWordTotalCountBolt(poolConfig);
+ RedisLookupMapper lookupMapper = setupLookupMapper();
+ RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
+
+ PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
//wordspout -> lookupbolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
- builder.setBolt(LOOKUP_BOLT, redisLookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
if (args.length == 2) {
LocalCluster cluster = new LocalCluster();
@@ -124,4 +121,46 @@ public class LookupWordCount {
System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
}
}
+
+ private static RedisLookupMapper setupLookupMapper() {
+ return new WordCountRedisLookupMapper();
+ }
+
+ private static class WordCountRedisLookupMapper implements RedisLookupMapper {
+ private RedisDataTypeDescription description;
+ private final String hashKey = "wordCount";
+
+ public WordCountRedisLookupMapper() {
+ description = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ String member = getKeyFromTuple(input);
+ List<Values> values = Lists.newArrayList();
+ values.add(new Values(member, value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("wordName", "count"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return description;
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return tuple.getStringByField("word");
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return null;
+ }
+ }
}