You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:29 UTC
[01/12] storm git commit: Improve RedisStateQuerier to convert
List from Redis value
Repository: storm
Updated Branches:
refs/heads/master ad98824fc -> a0c032358
Improve RedisStateQuerier to convert List<Values> from Redis value
* Reuse RedisStoreMapper / RedisLookupMapper
** RedisState*Querier / RedisState*Updater
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7fec9a1f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7fec9a1f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7fec9a1f
Branch: refs/heads/master
Commit: 7fec9a1f4b9887897c9c7d0f3f80433fa34f18c0
Parents: b367ab4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 09:38:54 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 09:38:54 2015 +0900
----------------------------------------------------------------------
.../trident/state/RedisClusterStateQuerier.java | 37 +++++++--------
.../trident/state/RedisClusterStateUpdater.java | 33 +++++++------
.../redis/trident/state/RedisStateQuerier.java | 50 ++++++++++++--------
.../redis/trident/state/RedisStateUpdater.java | 32 +++++++------
.../redis/trident/WordCountLookupMapper.java | 40 ++++++++++++++++
.../redis/trident/WordCountStoreMapper.java | 22 +++++++++
.../redis/trident/WordCountTridentRedis.java | 12 +++--
.../trident/WordCountTridentRedisCluster.java | 11 +++--
.../WordCountTridentRedisClusterMap.java | 1 -
.../redis/trident/WordCountTridentRedisMap.java | 1 -
.../redis/trident/WordCountTupleMapper.java | 16 -------
11 files changed, 158 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 17614a1..4382fe3 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -19,6 +19,7 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,37 +30,30 @@ import storm.trident.tuple.TridentTuple;
import java.util.List;
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, String> {
+public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
+ private final RedisLookupMapper lookupMapper;
- public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+ public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
}
@Override
- public List<String> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
- List<String> ret = Lists.newArrayList();
-
- List<String> keys = Lists.newArrayList();
+ public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
+ List<List<Values>> ret = Lists.newArrayList();
JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();
+ for (int i = 0 ; i < inputs.size() ; i++) {
+ TridentTuple input = inputs.get(i);
- for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTuple(input);
- if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
- key = redisKeyPrefix + key;
- }
+ String key = lookupMapper.getKeyFromTuple(input);
String value = jedisCluster.get(key);
- ret.add(value);
-
- logger.debug("redis get key[" + key + "] count[" + value + "]");
+ ret.add(lookupMapper.toTuple(input, value));
+ logger.debug("redis get key[" + key + "] value [" + value + "]");
}
} finally {
if (jedisCluster != null) {
@@ -71,8 +65,9 @@ public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterStat
}
@Override
- public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTuple(tuple);
- collector.emit(new Values(key, s));
+ public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+ for (Values value : values) {
+ collector.emit(value);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 023b527..35fb48e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.redis.trident.state;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,13 +32,13 @@ import java.util.List;
public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
+ private final RedisStoreMapper storeMapper;
private final int expireIntervalSec;
- public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+ public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+ this.storeMapper = storeMapper;
+ assertDataType(storeMapper.getDataTypeDescription());
+
if (expireIntervalSec > 0) {
this.expireIntervalSec = expireIntervalSec;
} else {
@@ -52,19 +54,15 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
try {
jedisCluster = redisClusterState.getJedisCluster();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTuple(input);
- String redisKey = key;
- if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
- redisKey = redisKeyPrefix + redisKey;
- }
- String value = this.tupleMapper.getValueFromTuple(input);
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
- logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+ logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
if (this.expireIntervalSec > 0) {
- jedisCluster.setex(redisKey, expireIntervalSec, value);
+ jedisCluster.setex(key, expireIntervalSec, value);
} else {
- jedisCluster.set(redisKey, value);
+ jedisCluster.set(key, value);
}
}
} finally {
@@ -73,4 +71,11 @@ public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState
}
}
}
+
+ private void assertDataType(RedisDataTypeDescription storeMapper) {
+ if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+ throw new IllegalArgumentException("State should be STRING type");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 294e83b..a215741 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -19,42 +19,43 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseQueryFunction;
import storm.trident.tuple.TridentTuple;
+import java.util.ArrayList;
import java.util.List;
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
- private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
+public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
+ private final RedisLookupMapper lookupMapper;
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
-
- public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+ public RedisStateQuerier(RedisLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+ assertDataType(lookupMapper.getDataTypeDescription());
}
@Override
- public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
+ public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
+ List<List<Values>> values = new ArrayList<List<Values>>();
+
List<String> keys = Lists.newArrayList();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTuple(input);
- if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
- key = redisKeyPrefix + key;
- }
- keys.add(key);
+ keys.add(lookupMapper.getKeyFromTuple(input));
}
Jedis jedis = null;
try {
jedis = redisState.getJedis();
- return jedis.mget(keys.toArray(new String[keys.size()]));
+ List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
+
+ for (int i = 0 ; i < redisVals.size() ; i++) {
+ values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+ }
+
+ return values;
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
@@ -63,8 +64,15 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
}
@Override
- public void execute(TridentTuple tuple, String s, TridentCollector collector) {
- String key = this.tupleMapper.getKeyFromTuple(tuple);
- collector.emit(new Values(key, s));
+ public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+ for (Values value : values) {
+ collector.emit(value);
+ }
+ }
+
+ private void assertDataType(RedisDataTypeDescription lookupMapper) {
+ if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+ throw new IllegalArgumentException("State should be STRING type");
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 664a222..384a120 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.redis.trident.state;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,13 +32,13 @@ import java.util.List;
public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
+ private final RedisStoreMapper storeMapper;
private final int expireIntervalSec;
- public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+ public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+ this.storeMapper = storeMapper;
+ assertDataType(storeMapper.getDataTypeDescription());
+
if (expireIntervalSec > 0) {
this.expireIntervalSec = expireIntervalSec;
} else {
@@ -51,19 +53,15 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTuple(input);
- String redisKey = key;
- if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
- redisKey = redisKeyPrefix + redisKey;
- }
- String value = this.tupleMapper.getValueFromTuple(input);
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
- logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+ logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
if (this.expireIntervalSec > 0) {
- jedis.setex(redisKey, expireIntervalSec, value);
+ jedis.setex(key, expireIntervalSec, value);
} else {
- jedis.set(redisKey, value);
+ jedis.set(key, value);
}
}
} finally {
@@ -72,4 +70,10 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
}
}
}
+
+ private void assertDataType(RedisDataTypeDescription storeMapper) {
+ if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
+ throw new IllegalArgumentException("State should be STRING type");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
new file mode 100644
index 0000000..891a1af
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -0,0 +1,40 @@
+package org.apache.storm.redis.trident;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WordCountLookupMapper implements RedisLookupMapper {
+ @Override
+ public List<Values> toTuple(ITuple input, Object value) {
+ List<Values> values = new ArrayList<Values>();
+ values.add(new Values(getKeyFromTuple(input), value));
+ return values;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "value"));
+ }
+
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return "test_" + tuple.getString(0);
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getInteger(1).toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
new file mode 100644
index 0000000..aa03ead
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -0,0 +1,22 @@
+package org.apache.storm.redis.trident;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+
+public class WordCountStoreMapper implements RedisStoreMapper {
+ @Override
+ public RedisDataTypeDescription getDataTypeDescription() {
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ }
+
+ @Override
+ public String getKeyFromTuple(ITuple tuple) {
+ return "test_" + tuple.getString(0);
+ }
+
+ @Override
+ public String getValueFromTuple(ITuple tuple) {
+ return tuple.getInteger(1).toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 8b6ebc5..79bab5a 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -23,7 +23,8 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
@@ -47,7 +48,9 @@ public class WordCountTridentRedis {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TupleMapper tupleMapper = new WordCountTupleMapper();
+
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();
@@ -55,12 +58,12 @@ public class WordCountTridentRedis {
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater("test_", tupleMapper, 86400000),
+ new RedisStateUpdater(storeMapper, 86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
- new RedisStateQuerier("test_", tupleMapper),
+ new RedisStateQuerier(lookupMapper),
new Fields("columnName","columnValue"));
stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
return topology.build();
@@ -92,5 +95,4 @@ public class WordCountTridentRedis {
System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index ddb6939..280b273 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -23,7 +23,8 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisClusterState;
import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
@@ -55,7 +56,9 @@ public class WordCountTridentRedisCluster {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- TupleMapper tupleMapper = new WordCountTupleMapper();
+
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
TridentTopology topology = new TridentTopology();
@@ -63,12 +66,12 @@ public class WordCountTridentRedisCluster {
stream.partitionPersist(factory,
fields,
- new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
+ new RedisClusterStateUpdater(storeMapper, 86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
- new RedisClusterStateQuerier("test_", tupleMapper),
+ new RedisClusterStateQuerier(lookupMapper),
new Fields("columnName","columnValue"));
stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
return topology.build();
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index de1f252..027c785 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -56,7 +56,6 @@ public class WordCountTridentRedisClusterMap {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- TupleMapper tupleMapper = new WordCountTupleMapper();
StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 4d4afe8..8ecf9ad 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -48,7 +48,6 @@ public class WordCountTridentRedisMap {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TupleMapper tupleMapper = new WordCountTupleMapper();
StateFactory factory = RedisMapState.transactional(poolConfig);
TridentTopology topology = new TridentTopology();
http://git-wip-us.apache.org/repos/asf/storm/blob/7fec9a1f/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
deleted file mode 100644
index 1e601c9..0000000
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.storm.redis.trident;
-
-import backtype.storm.tuple.ITuple;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-
-public class WordCountTupleMapper implements TupleMapper {
- @Override
- public String getKeyFromTuple(ITuple tuple) {
- return tuple.getString(0);
- }
-
- @Override
- public String getValueFromTuple(ITuple tuple) {
- return tuple.getInteger(1).toString();
- }
-}
[06/12] storm git commit: [storm-redis] Unify a way to specify Redis
data type
Posted by bo...@apache.org.
[storm-redis] Unify a way to specify Redis data type
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98704ec8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98704ec8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98704ec8
Branch: refs/heads/master
Commit: 98704ec81a7629d11a95a744ee47d3baf7c2de6a
Parents: 52fec9a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 8 06:10:44 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 8 06:10:44 2015 +0900
----------------------------------------------------------------------
.../storm/redis/trident/state/Options.java | 5 ++-
.../trident/state/RedisClusterMapState.java | 39 ++++++++++++------
.../redis/trident/state/RedisMapState.java | 42 ++++++++++++++------
.../WordCountTridentRedisClusterMap.java | 5 ++-
.../redis/trident/WordCountTridentRedisMap.java | 7 +++-
5 files changed, 70 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index bef7b6c..b262c42 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -17,14 +17,17 @@
*/
package org.apache.storm.redis.trident.state;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import storm.trident.state.Serializer;
import java.io.Serializable;
public class Options<T> implements Serializable {
+ private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+
public int localCacheSize = 1000;
public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
KeyFactory keyFactory = null;
public Serializer<T> serializer = null;
- public String hkey = null;
+ public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index ccf5b38..10bf9ea 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -22,6 +22,7 @@ import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.storm.redis.common.config.JedisClusterConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.JedisCluster;
import storm.trident.state.*;
import storm.trident.state.map.*;
@@ -37,9 +38,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return opaque(jedisClusterConfig, new Options());
}
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return opaque(jedisClusterConfig, opts);
}
@@ -60,9 +61,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return transactional(jedisClusterConfig, new Options());
}
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return transactional(jedisClusterConfig, opts);
}
@@ -83,9 +84,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
return nonTransactional(jedisClusterConfig, new Options());
}
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return nonTransactional(jedisClusterConfig, opts);
}
@@ -180,7 +181,9 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
@Override
protected List<String> retrieveValuesFromRedis(List<String> keys) {
String[] stringKeys = keys.toArray(new String[keys.size()]);
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
List<String> values = Lists.newArrayList();
for (String stringKey : keys) {
@@ -189,19 +192,31 @@ public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
}
return values;
- } else {
- return jedisCluster.hmget(this.options.hkey, stringKeys);
+
+ case HASH:
+ return jedisCluster.hmget(description.getAdditionalKey(), stringKeys);
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
}
@Override
protected void updateStatesToRedis(Map<String, String> keyValues) {
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
}
- } else {
- jedisCluster.hmset(this.options.hkey, keyValues);
+ break;
+
+ case HASH:
+ jedisCluster.hmset(description.getAdditionalKey(), keyValues);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 1461203..14b014e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,6 +21,7 @@ import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import storm.trident.state.*;
@@ -37,9 +38,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return opaque(jedisPoolConfig, new Options());
}
- public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory opaque(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return opaque(jedisPoolConfig, opts);
}
@@ -60,9 +61,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return transactional(jedisPoolConfig, new Options());
}
- public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory transactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return transactional(jedisPoolConfig, opts);
}
@@ -83,9 +84,9 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
return nonTransactional(jedisPoolConfig, new Options());
}
- public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, String hkey) {
+ public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig, RedisDataTypeDescription dataTypeDescription) {
Options opts = new Options();
- opts.hkey = hkey;
+ opts.dataTypeDescription = dataTypeDescription;
return nonTransactional(jedisPoolConfig, opts);
}
@@ -181,15 +182,23 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
@Override
protected List<String> retrieveValuesFromRedis(List<String> keys) {
String[] stringKeys = keys.toArray(new String[keys.size()]);
+
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
return jedis.mget(stringKeys);
- } else {
- return jedis.hmget(this.options.hkey, stringKeys);
+
+ case HASH:
+ return jedis.hmget(description.getAdditionalKey(), stringKeys);
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
+
} finally {
if (jedis != null) {
jedis.close();
@@ -204,12 +213,21 @@ public class RedisMapState<T> extends AbstractRedisMapState<T> {
try {
jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
+ RedisDataTypeDescription description = this.options.dataTypeDescription;
+ switch (description.getDataType()) {
+ case STRING:
String[] keyValue = buildKeyValuesList(keyValues);
jedis.mset(keyValue);
- } else {
- jedis.hmset(this.options.hkey, keyValues);
+ break;
+
+ case HASH:
+ jedis.hmset(description.getAdditionalKey(), keyValues);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
}
+
} finally {
if (jedis != null) {
jedis.close();
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index 027c785..beb4b5f 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -23,6 +23,7 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.trident.state.RedisClusterMapState;
import org.apache.storm.redis.common.config.JedisClusterConfig;
@@ -56,7 +57,9 @@ public class WordCountTridentRedisClusterMap {
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisClusterMapState.transactional(clusterConfig, dataTypeDescription);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
http://git-wip-us.apache.org/repos/asf/storm/blob/98704ec8/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
index 8ecf9ad..cfda54e 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java
@@ -23,7 +23,7 @@ import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
-import org.apache.storm.redis.common.mapper.TupleMapper;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.trident.state.RedisMapState;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import storm.trident.Stream;
@@ -48,7 +48,10 @@ public class WordCountTridentRedisMap {
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- StateFactory factory = RedisMapState.transactional(poolConfig);
+
+ RedisDataTypeDescription dataTypeDescription = new RedisDataTypeDescription(
+ RedisDataTypeDescription.RedisDataType.HASH, "test");
+ StateFactory factory = RedisMapState.transactional(poolConfig, dataTypeDescription);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
[05/12] storm git commit: Apply changes to README.md
Posted by bo...@apache.org.
Apply changes to README.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/52fec9ab
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/52fec9ab
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/52fec9ab
Branch: refs/heads/master
Commit: 52fec9ab2cc3e9464fab158002fd27887eb6ff85
Parents: 2b1f1ae
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 19:15:27 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 19:15:27 2015 +0900
----------------------------------------------------------------------
external/storm-redis/README.md | 20 ++++++++++++++++----
1 file changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/52fec9ab/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 32480e6..aa7a874 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -184,7 +184,8 @@ RedisState
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();
@@ -192,8 +193,13 @@ RedisState
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater("test_", tupleMapper, 86400000),
+ new RedisStateUpdater(storeMapper, 86400000),
new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
```
RedisClusterState
@@ -205,7 +211,8 @@ RedisClusterState
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
- TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ RedisStoreMapper storeMapper = new WordCountStoreMapper();
+ RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
TridentTopology topology = new TridentTopology();
@@ -213,8 +220,13 @@ RedisClusterState
stream.partitionPersist(factory,
fields,
- new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
+ new RedisClusterStateUpdater(storeMapper, 86400000),
new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisClusterStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
```
## License
[12/12] storm git commit: Added STORM-753 to Changelog
Posted by bo...@apache.org.
Added STORM-753 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a0c03235
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a0c03235
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a0c03235
Branch: refs/heads/master
Commit: a0c032358a63632b958f6f0d1d0bbf9109020e9f
Parents: bed27a7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 2 09:59:16 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 2 09:59:16 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a0c03235/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1d4ebef..ad235d8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
## 0.11.0
## 0.10.0
+ * STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value
* STORM-835: Netty Client hold batch object until io operation complete
* STORM-827: Allow AutoTGT to work with storm-hdfs too.
* STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.
[03/12] storm git commit: Extract abstract classes to reduce code
duplication (Redis*MapState)
Posted by bo...@apache.org.
Extract abstract classes to reduce code duplication (Redis*MapState)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9c0329b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9c0329b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9c0329b
Branch: refs/heads/master
Commit: b9c0329bd182333f0af237e661e88958d7157520
Parents: 8fd1f4b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 18:55:03 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 18:55:03 2015 +0900
----------------------------------------------------------------------
.../trident/state/AbstractRedisMapState.java | 70 ++++++++
.../storm/redis/trident/state/KeyFactory.java | 18 ++
.../storm/redis/trident/state/Options.java | 13 ++
.../trident/state/RedisClusterMapState.java | 137 +++-----------
.../redis/trident/state/RedisMapState.java | 177 +++++--------------
5 files changed, 173 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
new file mode 100644
index 0000000..a85a57a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -0,0 +1,70 @@
+package org.apache.storm.redis.trident.state;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import storm.trident.state.*;
+import storm.trident.state.map.IBackingMap;
+
+import java.util.*;
+
+public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
+ public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+ StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+ StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+ StateType.OPAQUE, new JSONOpaqueSerializer()
+ ));
+
+ @Override public List<T> multiGet(List<List<Object>> keys) {
+ if (keys.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<String> stringKeys = buildKeys(keys);
+ List<String> values = retrieveValuesFromRedis(stringKeys);
+
+ return deserializeValues(keys, values);
+ }
+
+ @Override
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
+ if (keys.size() == 0) {
+ return;
+ }
+
+ Map<String, String> keyValues = new HashMap<String, String>();
+ for (int i = 0; i < keys.size(); i++) {
+ String val = new String(getSerializer().serialize(vals.get(i)));
+ String redisKey = getKeyFactory().build(keys.get(i));
+ keyValues.put(redisKey, val);
+ }
+
+ updateStatesToRedis(keyValues);
+ }
+
+ private List<String> buildKeys(List<List<Object>> keys) {
+ List<String> stringKeys = new ArrayList<String>();
+
+ for (List<Object> key : keys) {
+ stringKeys.add(getKeyFactory().build(key));
+ }
+
+ return stringKeys;
+ }
+
+ private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+ List<T> result = new ArrayList<T>(keys.size());
+ for (String value : values) {
+ if (value != null) {
+ result.add((T) getSerializer().deserialize(value.getBytes()));
+ } else {
+ result.add(null);
+ }
+ }
+ return result;
+ }
+
+ protected abstract Serializer getSerializer();
+ protected abstract KeyFactory getKeyFactory();
+ protected abstract List<String> retrieveValuesFromRedis(List<String> keys);
+ protected abstract void updateStatesToRedis(Map<String, String> keyValues);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
new file mode 100644
index 0000000..0fac726
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -0,0 +1,18 @@
+package org.apache.storm.redis.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KeyFactory extends Serializable {
+ String build(List<Object> key);
+
+ class DefaultKeyFactory implements KeyFactory {
+ public String build(List<Object> key) {
+ if (key.size() != 1)
+ throw new RuntimeException("Default KeyFactory does not support compound keys");
+
+ return (String) key.get(0);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
new file mode 100644
index 0000000..59e4ecb
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -0,0 +1,13 @@
+package org.apache.storm.redis.trident.state;
+
+import storm.trident.state.Serializer;
+
+import java.io.Serializable;
+
+public class Options<T> implements Serializable {
+ public int localCacheSize = 1000;
+ public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+ KeyFactory keyFactory = null;
+ public Serializer<T> serializer = null;
+ public String hkey = null;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 1154376..ccf5b38 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
import java.util.List;
import java.util.Map;
-public class RedisClusterMapState<T> implements IBackingMap<T> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
-
- private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
- StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
- StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
- StateType.OPAQUE, new JSONOpaqueSerializer()
- ));
-
- public static class DefaultKeyFactory implements KeyFactory {
- public String build(List<Object> key) {
- if (key.size() != 1)
- throw new RuntimeException("Default KeyFactory does not support compound keys");
- return (String) key.get(0);
- }
- };
-
- public static class Options<T> implements Serializable {
- public int localCacheSize = 1000;
- public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
- KeyFactory keyFactory = null;
- public Serializer<T> serializer = null;
- public String hkey = null;
- }
-
- public static interface KeyFactory extends Serializable {
- String build(List<Object> key);
- }
-
+public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
/**
* OpaqueTransactional for redis-cluster.
* */
@@ -150,8 +99,6 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
}
-
-
protected static class Factory implements StateFactory {
public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
@@ -169,7 +116,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
this.keyFactory = options.keyFactory;
if (this.keyFactory == null) {
- this.keyFactory = new DefaultKeyFactory();
+ this.keyFactory = new KeyFactory.DefaultKeyFactory();
}
this.serializer = options.serializer;
if (this.serializer == null) {
@@ -220,74 +167,40 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
this.keyFactory = keyFactory;
}
- public List<T> multiGet(List<List<Object>> keys) {
- if (keys.size() == 0) {
- return Collections.emptyList();
- }
+ @Override
+ protected Serializer getSerializer() {
+ return serializer;
+ }
+
+ @Override
+ protected KeyFactory getKeyFactory() {
+ return keyFactory;
+ }
+
+ @Override
+ protected List<String> retrieveValuesFromRedis(List<String> keys) {
+ String[] stringKeys = keys.toArray(new String[keys.size()]);
if (Strings.isNullOrEmpty(this.options.hkey)) {
- String[] stringKeys = buildKeys(keys);
List<String> values = Lists.newArrayList();
- for (String stringKey : stringKeys) {
+ for (String stringKey : keys) {
String value = jedisCluster.get(stringKey);
values.add(value);
}
- return deserializeValues(keys, values);
+ return values;
} else {
- Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
- List<String> values = buildValuesFromMap(keys, keyValue);
- return deserializeValues(keys, values);
- }
- }
-
- private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
- List<String> values = new ArrayList<String>(keys.size());
- for (List<Object> key : keys) {
- String strKey = keyFactory.build(key);
- String value = keyValue.get(strKey);
- values.add(value);
- }
- return values;
- }
-
- private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
- List<T> result = new ArrayList<T>(keys.size());
- for (String value : values) {
- if (value != null) {
- result.add((T) serializer.deserialize(value.getBytes()));
- } else {
- result.add(null);
- }
+ return jedisCluster.hmget(this.options.hkey, stringKeys);
}
- return result;
- }
-
- private String[] buildKeys(List<List<Object>> keys) {
- String[] stringKeys = new String[keys.size()];
- int index = 0;
- for (List<Object> key : keys)
- stringKeys[index++] = keyFactory.build(key);
- return stringKeys;
}
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- if (keys.size() == 0) {
- return;
- }
-
+ @Override
+ protected void updateStatesToRedis(Map<String, String> keyValues) {
if (Strings.isNullOrEmpty(this.options.hkey)) {
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- String redisKey = keyFactory.build(keys.get(i));
- jedisCluster.set(redisKey, val);
+ for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+ jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
}
} else {
- Map<String, String> keyValues = new HashMap<String, String>();
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- keyValues.put(keyFactory.build(keys.get(i)), val);
- }
jedisCluster.hmset(this.options.hkey, keyValues);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7bc5afb..1461203 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
import java.util.List;
import java.util.Map;
-public class RedisMapState<T> implements IBackingMap<T> {
- private static final Logger logger = LoggerFactory.getLogger(RedisMapState.class);
-
- private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
- StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
- StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
- StateType.OPAQUE, new JSONOpaqueSerializer()
- ));
-
- public static class DefaultKeyFactory implements KeyFactory {
- public String build(List<Object> key) {
- if (key.size() != 1)
- throw new RuntimeException("Default KeyFactory does not support compound keys");
- return (String) key.get(0);
- }
- };
-
- public static class Options<T> implements Serializable {
- public int localCacheSize = 1000;
- public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
- KeyFactory keyFactory = null;
- public Serializer<T> serializer = null;
- public String hkey = null;
- }
-
- public static interface KeyFactory extends Serializable {
- String build(List<Object> key);
- }
-
+public class RedisMapState<T> extends AbstractRedisMapState<T> {
/**
* OpaqueTransactional for redis.
* */
@@ -167,7 +116,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
this.keyFactory = options.keyFactory;
if (this.keyFactory == null) {
- this.keyFactory = new DefaultKeyFactory();
+ this.keyFactory = new KeyFactory.DefaultKeyFactory();
}
this.serializer = options.serializer;
if (this.serializer == null) {
@@ -219,96 +168,64 @@ public class RedisMapState<T> implements IBackingMap<T> {
this.keyFactory = keyFactory;
}
- public List<T> multiGet(List<List<Object>> keys) {
- if (keys.size() == 0) {
- return Collections.emptyList();
- }
+ @Override
+ protected Serializer getSerializer() {
+ return serializer;
+ }
- String[] stringKeys = buildKeys(keys);
-
- if (Strings.isNullOrEmpty(this.options.hkey)) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- List<String> values = jedis.mget(stringKeys);
- return deserializeValues(keys, values);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- } else {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- List<String> values = jedis.hmget(this.options.hkey, stringKeys);
- return deserializeValues(keys, values);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- }
+ @Override
+ protected KeyFactory getKeyFactory() {
+ return keyFactory;
}
- private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
- List<T> result = new ArrayList<T>(keys.size());
- for (String value : values) {
- if (value != null) {
- result.add((T) serializer.deserialize(value.getBytes()));
+ @Override
+ protected List<String> retrieveValuesFromRedis(List<String> keys) {
+ String[] stringKeys = keys.toArray(new String[keys.size()]);
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ return jedis.mget(stringKeys);
} else {
- result.add(null);
+ return jedis.hmget(this.options.hkey, stringKeys);
+ }
+ } finally {
+ if (jedis != null) {
+ jedis.close();
}
}
- return result;
}
- private String[] buildKeys(List<List<Object>> keys) {
- String[] stringKeys = new String[keys.size()];
- int index = 0;
- for (List<Object> key : keys)
- stringKeys[index++] = keyFactory.build(key);
- return stringKeys;
- }
+ @Override
+ protected void updateStatesToRedis(Map<String, String> keyValues) {
+ Jedis jedis = null;
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- if (keys.size() == 0) {
- return;
- }
+ try {
+ jedis = jedisPool.getResource();
- if (Strings.isNullOrEmpty(this.options.hkey)) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- String[] keyValue = buildKeyValuesList(keys, vals);
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ String[] keyValue = buildKeyValuesList(keyValues);
jedis.mset(keyValue);
- } finally {
- if (jedis != null) {
- jedisPool.returnResource(jedis);
- }
- }
- } else {
- Jedis jedis = jedisPool.getResource();
- try {
- Map<String, String> keyValues = new HashMap<String, String>();
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- keyValues.put(keyFactory.build(keys.get(i)), val);
- }
+ } else {
jedis.hmset(this.options.hkey, keyValues);
-
- } finally {
- jedisPool.returnResource(jedis);
+ }
+ } finally {
+ if (jedis != null) {
+ jedis.close();
}
}
}
- private String[] buildKeyValuesList(List<List<Object>> keys, List<T> vals) {
- String[] keyValues = new String[keys.size() * 2];
- for (int i = 0; i < keys.size(); i++) {
- keyValues[i * 2] = keyFactory.build(keys.get(i));
- keyValues[i * 2 + 1] = new String(serializer.serialize(vals.get(i)));
+ private String[] buildKeyValuesList(Map<String, String> keyValues) {
+ String[] keyValueLists = new String[keyValues.size() * 2];
+
+ int idx = 0;
+ for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+ keyValueLists[idx++] = kvEntry.getKey();
+ keyValueLists[idx++] = kvEntry.getValue();
}
- return keyValues;
+
+ return keyValueLists;
}
}
[04/12] storm git commit: Add missing Apache comments to new files
Posted by bo...@apache.org.
Add missing Apache comments to new files
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b1f1ae5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b1f1ae5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b1f1ae5
Branch: refs/heads/master
Commit: 2b1f1ae56b3b3eb4ada7563db319066cb78ce0dc
Parents: b9c0329
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 19:02:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 19:02:19 2015 +0900
----------------------------------------------------------------------
.../redis/trident/state/AbstractRedisMapState.java | 17 +++++++++++++++++
.../storm/redis/trident/state/KeyFactory.java | 17 +++++++++++++++++
.../apache/storm/redis/trident/state/Options.java | 17 +++++++++++++++++
3 files changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
index a85a57a..56b37b2 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.redis.trident.state;
import com.google.common.collect.ImmutableMap;
http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
index 0fac726..7acea10 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.redis.trident.state;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/storm/blob/2b1f1ae5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index 59e4ecb..bef7b6c 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.storm.redis.trident.state;
import storm.trident.state.Serializer;
[02/12] storm git commit: Redis*StateQuerier / Redis*StateUpdater now
support HASH type
Posted by bo...@apache.org.
Redis*StateQuerier / Redis*StateUpdater now support HASH type
* use Pipeline when available to gain performance
* extract abstract classes to reduce code duplication
** AbstractRedisStateQuerier, AbstractRedisStateUpdater
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8fd1f4b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8fd1f4b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8fd1f4b1
Branch: refs/heads/master
Commit: 8fd1f4b193b5d81b3036726268da39380e2b3b61
Parents: 7fec9a1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 12:21:57 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 12:21:57 2015 +0900
----------------------------------------------------------------------
.../state/AbstractRedisStateQuerier.java | 69 ++++++++++++++++++++
.../state/AbstractRedisStateUpdater.java | 67 +++++++++++++++++++
.../trident/state/RedisClusterStateQuerier.java | 53 ++++++---------
.../trident/state/RedisClusterStateUpdater.java | 66 ++++++++-----------
.../redis/trident/state/RedisStateQuerier.java | 54 +++++----------
.../redis/trident/state/RedisStateUpdater.java | 67 +++++++++----------
.../redis/trident/WordCountLookupMapper.java | 2 +-
.../redis/trident/WordCountStoreMapper.java | 2 +-
8 files changed, 231 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
new file mode 100644
index 0000000..24ecfc4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQueryFunction<T, List<Values>> {
+ private final RedisLookupMapper lookupMapper;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
+ public AbstractRedisStateQuerier(RedisLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public List<List<Values>> batchRetrieve(T state, List<TridentTuple> inputs) {
+ List<List<Values>> values = Lists.newArrayList();
+
+ List<String> keys = Lists.newArrayList();
+ for (TridentTuple input : inputs) {
+ keys.add(lookupMapper.getKeyFromTuple(input));
+ }
+
+ List<String> redisVals = retrieveValuesFromRedis(state, keys);
+ for (int i = 0 ; i < redisVals.size() ; i++) {
+ values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+ }
+
+ return values;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+ for (Values value : values) {
+ collector.emit(value);
+ }
+ }
+
+ protected abstract List<String> retrieveValuesFromRedis(T redisClusterState, List<String> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
new file mode 100644
index 0000000..2f95341
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
+ private final RedisStoreMapper storeMapper;
+
+ protected final int expireIntervalSec;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
+ public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+ this.storeMapper = storeMapper;
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+
+ if (expireIntervalSec > 0) {
+ this.expireIntervalSec = expireIntervalSec;
+ } else {
+ this.expireIntervalSec = 0;
+ }
+ }
+
+ @Override
+ public void updateState(T state, List<TridentTuple> inputs,
+ TridentCollector collector) {
+ Map<String, String> keyToValue = new HashMap<String, String>();
+
+ for (TridentTuple input : inputs) {
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
+
+ keyToValue.put(key, value);
+ }
+
+ updateStatesToRedis(state, keyToValue);
+ }
+
+ protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 4382fe3..66ff3f6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -17,57 +17,42 @@
*/
package org.apache.storm.redis.trident.state;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
+import java.util.ArrayList;
import java.util.List;
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
- private final RedisLookupMapper lookupMapper;
-
+public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClusterState> {
public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
+ super(lookupMapper);
}
@Override
- public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
- List<List<Values>> ret = Lists.newArrayList();
-
+ protected List<String> retrieveValuesFromRedis(RedisClusterState redisClusterState, List<String> keys) {
JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();
-
- for (int i = 0 ; i < inputs.size() ; i++) {
- TridentTuple input = inputs.get(i);
-
- String key = lookupMapper.getKeyFromTuple(input);
- String value = jedisCluster.get(key);
- ret.add(lookupMapper.toTuple(input, value));
- logger.debug("redis get key[" + key + "] value [" + value + "]");
+ List<String> redisVals = new ArrayList<String>();
+
+ for (String key : keys) {
+ switch (dataType) {
+ case STRING:
+ redisVals.add(jedisCluster.get(key));
+ break;
+ case HASH:
+ redisVals.add(jedisCluster.hget(additionalKey, key));
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+ }
}
+
+ return redisVals;
} finally {
if (jedisCluster != null) {
redisClusterState.returnJedisCluster(jedisCluster);
}
}
-
- return ret;
- }
-
- @Override
- public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
- for (Values value : values) {
- collector.emit(value);
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 35fb48e..924b6b9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -19,63 +19,51 @@ package org.apache.storm.redis.trident.state;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
-import java.util.List;
-
-public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
- private final RedisStoreMapper storeMapper;
- private final int expireIntervalSec;
+import java.util.Map;
+public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- this.storeMapper = storeMapper;
- assertDataType(storeMapper.getDataTypeDescription());
-
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
+ super(storeMapper, expireIntervalSec);
}
@Override
- public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
- TridentCollector collector) {
-
+ protected void updateStatesToRedis(RedisClusterState redisClusterState, Map<String, String> keyToValue) {
JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();
- for (TridentTuple input : inputs) {
- String key = storeMapper.getKeyFromTuple(input);
- String value = storeMapper.getValueFromTuple(input);
- logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
+ for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+ String key = kvEntry.getKey();
+ String value = kvEntry.getValue();
- if (this.expireIntervalSec > 0) {
- jedisCluster.setex(key, expireIntervalSec, value);
- } else {
- jedisCluster.set(key, value);
+ switch (dataType) {
+ case STRING:
+ if (this.expireIntervalSec > 0) {
+ jedisCluster.setex(key, expireIntervalSec, value);
+ } else {
+ jedisCluster.set(key, value);
+ }
+ break;
+ case HASH:
+ jedisCluster.hset(additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
}
+
+ // send expire command for hash only once
+ // it expires key itself entirely, so use it with caution
+ if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+ this.expireIntervalSec > 0) {
+ jedisCluster.expire(additionalKey, expireIntervalSec);
+ }
} finally {
if (jedisCluster != null) {
redisClusterState.returnJedisCluster(jedisCluster);
}
}
}
-
- private void assertDataType(RedisDataTypeDescription storeMapper) {
- if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index a215741..ac102dd 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -17,62 +17,40 @@
*/
package org.apache.storm.redis.trident.state;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
-import java.util.ArrayList;
import java.util.List;
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
- private final RedisLookupMapper lookupMapper;
-
+public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> {
public RedisStateQuerier(RedisLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
- assertDataType(lookupMapper.getDataTypeDescription());
+ super(lookupMapper);
}
@Override
- public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
- List<List<Values>> values = new ArrayList<List<Values>>();
-
- List<String> keys = Lists.newArrayList();
- for (TridentTuple input : inputs) {
- keys.add(lookupMapper.getKeyFromTuple(input));
- }
-
+ protected List<String> retrieveValuesFromRedis(RedisState redisState, List<String> keys) {
Jedis jedis = null;
try {
jedis = redisState.getJedis();
- List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
-
- for (int i = 0 ; i < redisVals.size() ; i++) {
- values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+ List<String> redisVals;
+
+ String[] keysForRedis = keys.toArray(new String[keys.size()]);
+ switch (dataType) {
+ case STRING:
+ redisVals = jedis.mget(keysForRedis);
+ break;
+ case HASH:
+ redisVals = jedis.hmget(additionalKey, keysForRedis);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
- return values;
+ return redisVals;
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
}
}
}
-
- @Override
- public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
- for (Values value : values) {
- collector.emit(value);
- }
- }
-
- private void assertDataType(RedisDataTypeDescription lookupMapper) {
- if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 384a120..583fa32 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -19,51 +19,51 @@ package org.apache.storm.redis.trident.state;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import redis.clients.jedis.Pipeline;
-import java.util.List;
-
-public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
-
- private final RedisStoreMapper storeMapper;
- private final int expireIntervalSec;
+import java.util.Map;
+public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- this.storeMapper = storeMapper;
- assertDataType(storeMapper.getDataTypeDescription());
-
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
+ super(storeMapper, expireIntervalSec);
}
@Override
- public void updateState(RedisState redisState, List<TridentTuple> inputs,
- TridentCollector collector) {
+ protected void updateStatesToRedis(RedisState redisState, Map<String, String> keyToValue) {
Jedis jedis = null;
try {
jedis = redisState.getJedis();
- for (TridentTuple input : inputs) {
- String key = storeMapper.getKeyFromTuple(input);
- String value = storeMapper.getValueFromTuple(input);
+ Pipeline pipeline = jedis.pipelined();
- logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
+ for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+ String key = kvEntry.getKey();
+ String value = kvEntry.getValue();
- if (this.expireIntervalSec > 0) {
- jedis.setex(key, expireIntervalSec, value);
- } else {
- jedis.set(key, value);
+ switch (dataType) {
+ case STRING:
+ if (this.expireIntervalSec > 0) {
+ pipeline.setex(key, expireIntervalSec, value);
+ } else {
+ pipeline.set(key, value);
+ }
+ break;
+ case HASH:
+ pipeline.hset(additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
}
+
+ // send expire command for hash only once
+ // it expires key itself entirely, so use it with caution
+ if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+ this.expireIntervalSec > 0) {
+ pipeline.expire(additionalKey, expireIntervalSec);
+ }
+
+ pipeline.sync();
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
@@ -71,9 +71,4 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
}
}
- private void assertDataType(RedisDataTypeDescription storeMapper) {
- if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
index 891a1af..5c67c8c 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -25,7 +25,7 @@ public class WordCountLookupMapper implements RedisLookupMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
- return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
index aa03ead..6521302 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -7,7 +7,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper;
public class WordCountStoreMapper implements RedisStoreMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
- return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
}
@Override
[11/12] storm git commit: Merge branch 'STORM-753' of
https://github.com/HeartSaVioR/storm into STORM-753
Posted by bo...@apache.org.
Merge branch 'STORM-753' of https://github.com/HeartSaVioR/storm into STORM-753
STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bed27a74
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bed27a74
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bed27a74
Branch: refs/heads/master
Commit: bed27a744ab7acd061a0ab71bcb8e78bc0e0151e
Parents: ad98824 1eb85a2
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Jun 2 09:52:11 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Jun 2 09:52:11 2015 -0500
----------------------------------------------------------------------
external/storm-redis/README.md | 20 +-
.../trident/state/AbstractRedisMapState.java | 96 +++++++++
.../state/AbstractRedisStateQuerier.java | 69 +++++++
.../state/AbstractRedisStateUpdater.java | 69 +++++++
.../storm/redis/trident/state/KeyFactory.java | 35 ++++
.../storm/redis/trident/state/Options.java | 33 ++++
.../trident/state/RedisClusterMapState.java | 153 +++++----------
.../trident/state/RedisClusterStateQuerier.java | 60 ++----
.../trident/state/RedisClusterStateUpdater.java | 69 +++----
.../redis/trident/state/RedisMapState.java | 194 +++++++------------
.../redis/trident/state/RedisStateQuerier.java | 54 ++----
.../redis/trident/state/RedisStateUpdater.java | 73 ++++---
.../redis/trident/WordCountLookupMapper.java | 40 ++++
.../redis/trident/WordCountStoreMapper.java | 22 +++
.../redis/trident/WordCountTridentRedis.java | 12 +-
.../trident/WordCountTridentRedisCluster.java | 11 +-
.../WordCountTridentRedisClusterMap.java | 6 +-
.../redis/trident/WordCountTridentRedisMap.java | 8 +-
.../redis/trident/WordCountTupleMapper.java | 16 --
19 files changed, 623 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
[07/12] storm git commit: Merge branch 'master' into STORM-753
Posted by bo...@apache.org.
Merge branch 'master' into STORM-753
Conflicts:
external/storm-redis/README.md
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/037a9754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/037a9754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/037a9754
Branch: refs/heads/master
Commit: 037a9754fdee063d2238a9c41e6ffdb79759d33d
Parents: 98704ec 0c2b3a4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat May 30 22:34:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat May 30 22:34:19 2015 +0900
----------------------------------------------------------------------
.gitignore | 1 +
.travis.yml | 9 +
CHANGELOG.md | 48 +-
LICENSE | 29 +
README.markdown | 7 +-
SECURITY.md | 50 ++
STORM-UI-REST-API.md | 44 +-
bin/storm | 2 +-
bin/storm.cmd | 4 +-
bin/storm.py | 24 +-
conf/defaults.yaml | 3 +-
dev-tools/test-ns.py | 19 +-
.../print-errors-from-clojure-test-reports.py | 58 ++
dev-tools/travis/travis-build.sh | 50 ++
docs/documentation/Metrics.md | 2 +-
docs/documentation/Multilang-protocol.md | 63 +-
.../documentation/Setting-up-a-Storm-cluster.md | 7 +-
docs/documentation/Trident-API-Overview.md | 2 +-
examples/storm-starter/README.markdown | 8 +-
.../storm-starter/multilang/resources/storm.js | 373 ---------
.../storm-starter/multilang/resources/storm.py | 260 -------
.../storm-starter/multilang/resources/storm.rb | 236 ------
examples/storm-starter/pom.xml | 67 +-
external/storm-eventhubs/README.md | 41 +
external/storm-eventhubs/pom.xml | 122 +++
.../storm/eventhubs/bolt/EventHubBolt.java | 81 ++
.../client/ConnectionStringBuilder.java | 116 +++
.../storm/eventhubs/client/Constants.java | 32 +
.../storm/eventhubs/client/EventHubClient.java | 92 +++
.../eventhubs/client/EventHubConsumerGroup.java | 72 ++
.../eventhubs/client/EventHubException.java | 37 +
.../eventhubs/client/EventHubReceiver.java | 139 ++++
.../eventhubs/client/EventHubSendClient.java | 70 ++
.../storm/eventhubs/client/EventHubSender.java | 95 +++
.../storm/eventhubs/client/SelectorFilter.java | 38 +
.../eventhubs/client/SelectorFilterWriter.java | 64 ++
.../eventhubs/samples/AtMostOnceEventCount.java | 54 ++
.../storm/eventhubs/samples/EventCount.java | 155 ++++
.../storm/eventhubs/samples/EventHubLoop.java | 51 ++
.../samples/OpaqueTridentEventCount.java | 53 ++
.../samples/TransactionalTridentEventCount.java | 81 ++
.../eventhubs/samples/bolt/GlobalCountBolt.java | 83 ++
.../samples/bolt/PartialCountBolt.java | 63 ++
.../apache/storm/eventhubs/spout/EventData.java | 48 ++
.../storm/eventhubs/spout/EventDataScheme.java | 55 ++
.../eventhubs/spout/EventHubReceiverFilter.java | 56 ++
.../eventhubs/spout/EventHubReceiverImpl.java | 150 ++++
.../storm/eventhubs/spout/EventHubSpout.java | 258 +++++++
.../eventhubs/spout/EventHubSpoutConfig.java | 165 ++++
.../eventhubs/spout/EventHubSpoutException.java | 37 +
.../storm/eventhubs/spout/FieldConstants.java | 25 +
.../storm/eventhubs/spout/IEventDataScheme.java | 30 +
.../eventhubs/spout/IEventHubReceiver.java | 35 +
.../spout/IEventHubReceiverFactory.java | 30 +
.../spout/IEventHubReceiverFilter.java | 35 +
.../eventhubs/spout/IPartitionCoordinator.java | 27 +
.../eventhubs/spout/IPartitionManager.java | 37 +
.../spout/IPartitionManagerFactory.java | 33 +
.../storm/eventhubs/spout/IStateStore.java | 31 +
.../apache/storm/eventhubs/spout/MessageId.java | 56 ++
.../storm/eventhubs/spout/PartitionManager.java | 101 +++
.../eventhubs/spout/SimplePartitionManager.java | 136 ++++
.../spout/StaticPartitionCoordinator.java | 85 +++
.../eventhubs/spout/ZookeeperStateStore.java | 95 +++
.../storm/eventhubs/trident/Coordinator.java | 60 ++
.../trident/ITridentPartitionManager.java | 35 +
.../ITridentPartitionManagerFactory.java | 26 +
.../trident/OpaqueTridentEventHubEmitter.java | 69 ++
.../trident/OpaqueTridentEventHubSpout.java | 64 ++
.../storm/eventhubs/trident/Partition.java | 39 +
.../storm/eventhubs/trident/Partitions.java | 41 +
.../TransactionalTridentEventHubEmitter.java | 167 ++++
.../TransactionalTridentEventHubSpout.java | 66 ++
.../trident/TridentPartitionManager.java | 91 +++
.../src/main/resources/config.properties | 27 +
.../eventhubs/spout/EventHubReceiverMock.java | 105 +++
.../spout/EventHubSpoutCallerMock.java | 96 +++
.../spout/PartitionManagerCallerMock.java | 105 +++
.../spout/SpoutOutputCollectorMock.java | 61 ++
.../storm/eventhubs/spout/StateStoreMock.java | 54 ++
.../storm/eventhubs/spout/TestEventData.java | 47 ++
.../eventhubs/spout/TestEventHubSpout.java | 70 ++
.../eventhubs/spout/TestPartitionManager.java | 117 +++
.../TestTransactionalTridentEmitter.java | 93 +++
.../eventhubs/trident/TridentCollectorMock.java | 52 ++
.../mapper/SimpleTridentHBaseMapMapper.java | 50 ++
.../trident/mapper/TridentHBaseMapMapper.java | 40 +
.../hbase/trident/state/HBaseMapState.java | 45 +-
.../hdfs/common/security/HdfsSecurityUtil.java | 5 +-
external/storm-hive/pom.xml | 21 +
external/storm-jdbc/README.md | 72 +-
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 17 +-
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +-
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +-
.../org/apache/storm/jdbc/common/Column.java | 7 +-
.../storm/jdbc/common/ConnectionProvider.java | 26 +
.../jdbc/common/HikariCPConnectionProvider.java | 46 ++
.../apache/storm/jdbc/common/JdbcClient.java | 19 +-
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +-
.../storm/jdbc/trident/state/JdbcState.java | 13 +-
.../storm/jdbc/common/JdbcClientTest.java | 5 +-
.../jdbc/topology/AbstractUserTopology.java | 17 +-
.../jdbc/topology/UserPersistanceTopology.java | 18 +-
.../UserPersistanceTridentTopology.java | 2 +-
external/storm-kafka/README.md | 52 +-
.../jvm/storm/kafka/DynamicBrokersReader.java | 26 +
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 5 +-
.../src/jvm/storm/kafka/PartitionManager.java | 4 +-
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
.../kafka/trident/TridentKafkaEmitter.java | 4 +-
.../storm/kafka/DynamicBrokersReaderTest.java | 13 +
.../src/test/storm/kafka/KafkaUtilsTest.java | 6 +-
.../test/storm/kafka/bolt/KafkaBoltTest.java | 27 +
external/storm-redis/README.md | 4 +-
external/storm-redis/pom.xml | 2 +-
.../redis/common/container/JedisContainer.java | 7 +-
.../state/AbstractRedisStateUpdater.java | 6 +-
.../trident/state/RedisClusterStateUpdater.java | 9 +-
.../redis/trident/state/RedisStateUpdater.java | 9 +-
.../redis/trident/WordCountTridentRedis.java | 2 +-
.../trident/WordCountTridentRedisCluster.java | 2 +-
pom.xml | 8 +-
storm-core/pom.xml | 40 +-
storm-core/src/clj/backtype/storm/cluster.clj | 5 +-
storm-core/src/clj/backtype/storm/converter.clj | 10 +-
.../src/clj/backtype/storm/daemon/common.clj | 15 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 21 +-
.../src/clj/backtype/storm/daemon/executor.clj | 9 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 1 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 9 +-
.../clj/backtype/storm/daemon/supervisor.clj | 58 +-
.../src/clj/backtype/storm/daemon/worker.clj | 35 +-
.../src/clj/backtype/storm/local_state.clj | 99 +++
.../src/clj/backtype/storm/messaging/loader.clj | 13 +-
storm-core/src/clj/backtype/storm/testing.clj | 25 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 42 +-
.../src/clj/backtype/storm/ui/helpers.clj | 51 +-
storm-core/src/clj/backtype/storm/util.clj | 37 +-
storm-core/src/dev/resources/storm.js | 373 ---------
storm-core/src/dev/resources/storm.py | 260 -------
storm-core/src/dev/resources/storm.rb | 236 ------
storm-core/src/jvm/backtype/storm/Config.java | 104 ++-
.../src/jvm/backtype/storm/drpc/DRPCSpout.java | 8 +-
.../storm/generated/ClusterWorkerHeartbeat.java | 102 ++-
.../storm/generated/LSApprovedWorkers.java | 458 +++++++++++
.../generated/LSSupervisorAssignments.java | 471 ++++++++++++
.../storm/generated/LSSupervisorId.java | 406 ++++++++++
.../storm/generated/LSWorkerHeartbeat.java | 755 +++++++++++++++++++
.../storm/generated/LocalAssignment.java | 561 ++++++++++++++
.../storm/generated/LocalStateData.java | 471 ++++++++++++
.../jvm/backtype/storm/generated/Nimbus.java | 12 +-
.../storm/generated/SupervisorInfo.java | 116 ++-
.../storm/generated/SupervisorSummary.java | 117 ++-
.../storm/generated/ThriftSerializedObject.java | 516 +++++++++++++
.../storm/generated/TopologySummary.java | 2 +-
.../backtype/storm/messaging/netty/Client.java | 4 +-
.../security/auth/SaslTransportPlugin.java | 17 +-
.../GzipBridgeThriftSerializationDelegate.java | 64 ++
.../GzipThriftSerializationDelegate.java | 57 ++
.../jvm/backtype/storm/spout/ShellSpout.java | 6 +
.../src/jvm/backtype/storm/task/IBolt.java | 4 +-
.../src/jvm/backtype/storm/task/ShellBolt.java | 5 +-
.../backtype/storm/task/TopologyContext.java | 104 ++-
.../storm/utils/ExtendedThreadPoolExecutor.java | 67 ++
.../jvm/backtype/storm/utils/LocalState.java | 163 +++-
.../backtype/storm/utils/TransferDrainer.java | 62 +-
.../src/jvm/backtype/storm/utils/Utils.java | 58 +-
storm-core/src/multilang/js/storm.js | 366 ---------
storm-core/src/multilang/py/storm.py | 260 -------
storm-core/src/multilang/rb/storm.rb | 236 ------
storm-core/src/py/__init__.py | 2 +
storm-core/src/py/storm/DistributedRPC.py | 2 +
.../src/py/storm/DistributedRPCInvocations.py | 2 +
storm-core/src/py/storm/Nimbus.py | 10 +
storm-core/src/py/storm/__init__.py | 2 +
storm-core/src/py/storm/constants.py | 2 +
storm-core/src/py/storm/ttypes.py | 645 +++++++++++++++-
storm-core/src/storm.thrift | 43 ++
.../src/ui/public/css/jsonFormatter.min.css | 1 +
storm-core/src/ui/public/css/style.css | 15 +-
storm-core/src/ui/public/index.html | 3 +
.../src/ui/public/js/jsonFormatter.min.js | 2 +
storm-core/src/ui/public/js/script.js | 5 +-
.../public/templates/anti-forgery-template.html | 19 -
.../templates/component-page-template.html | 8 +-
.../public/templates/index-page-template.html | 6 +
.../templates/topology-page-template.html | 4 +-
storm-core/src/ui/public/topology.html | 11 +-
.../test/clj/backtype/storm/cluster_test.clj | 4 +-
.../clj/backtype/storm/local_state_test.clj | 40 +-
.../storm/security/auth/nimbus_auth_test.clj | 2 +-
.../clj/backtype/storm/transactional_test.clj | 6 +-
...ipBridgeThriftSerializationDelegateTest.java | 71 ++
.../storm/utils/DisruptorQueueTest.java | 38 +-
storm-core/test/resources/logback-test.xml | 26 +
storm-dist/binary/src/main/assembly/binary.xml | 41 +-
storm-multilang/javascript/pom.xml | 32 +
.../src/main/resources/resources/storm.js | 373 +++++++++
storm-multilang/python/pom.xml | 32 +
.../src/main/resources/resources/storm.py | 260 +++++++
storm-multilang/ruby/pom.xml | 32 +
.../ruby/src/main/resources/resources/storm.rb | 236 ++++++
203 files changed, 12236 insertions(+), 3173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/README.md
----------------------------------------------------------------------
diff --cc external/storm-redis/README.md
index aa7a874,f165c09..86cd937
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@@ -193,13 -192,8 +193,13 @@@ RedisStat
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater(storeMapper, 86400000),
+ new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
```
RedisClusterState
@@@ -220,13 -213,8 +220,13 @@@
stream.partitionPersist(factory,
fields,
- new RedisClusterStateUpdater(storeMapper, 86400000),
+ new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
new Fields());
+
+ TridentState state = topology.newStaticState(factory);
+ stream = stream.stateQuery(state, new Fields("word"),
+ new RedisClusterStateQuerier(lookupMapper),
+ new Fields("columnName","columnValue"));
```
## License
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
index 2f95341,0000000..87bb8fa
mode 100644,000000..100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@@ -1,67 -1,0 +1,69 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
+ private final RedisStoreMapper storeMapper;
+
- protected final int expireIntervalSec;
++ protected int expireIntervalSec = 0;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
- public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
++ public AbstractRedisStateUpdater(RedisStoreMapper storeMapper) {
+ this.storeMapper = storeMapper;
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
++ }
+
++ public void setExpireInterval(int expireIntervalSec) {
+ if (expireIntervalSec > 0) {
+ this.expireIntervalSec = expireIntervalSec;
+ } else {
+ this.expireIntervalSec = 0;
+ }
+ }
+
+ @Override
+ public void updateState(T state, List<TridentTuple> inputs,
+ TridentCollector collector) {
+ Map<String, String> keyToValue = new HashMap<String, String>();
+
+ for (TridentTuple input : inputs) {
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
+
+ keyToValue.put(key, value);
+ }
+
+ updateStatesToRedis(state, keyToValue);
+ }
+
+ protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 924b6b9,e00cfb6..17c5bfc
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@@ -17,15 -17,36 +17,20 @@@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
-import java.util.List;
+import java.util.Map;
-public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
- private int expireIntervalSec = 0;
-
- public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
- public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- super(storeMapper, expireIntervalSec);
++ public RedisClusterStateUpdater(RedisStoreMapper storeMapper) {
++ super(storeMapper);
+ }
+
+ public RedisClusterStateUpdater withExpire(int expireIntervalSec) {
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
-
++ setExpireInterval(expireIntervalSec);
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 583fa32,2939d3d..babcb1d
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@@ -17,16 -17,36 +17,21 @@@
*/
package org.apache.storm.redis.trident.state;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import redis.clients.jedis.Pipeline;
-import java.util.List;
+import java.util.Map;
-public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
-
- private final String redisKeyPrefix;
- private final TupleMapper tupleMapper;
- private int expireIntervalSec = 0;
-
- public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
- this.redisKeyPrefix = redisKeyPrefix;
- this.tupleMapper = tupleMapper;
+public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
- public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- super(storeMapper, expireIntervalSec);
++ public RedisStateUpdater(RedisStoreMapper storeMapper) {
++ super(storeMapper);
+ }
+
+ public RedisStateUpdater withExpire(int expireIntervalSec) {
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
-
++ setExpireInterval(expireIntervalSec);
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 79bab5a,eb13399..4a4aae0
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@@ -58,7 -55,7 +58,7 @@@ public class WordCountTridentRedis
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater(storeMapper, 86400000),
- new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
++ new RedisStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 280b273,8562e77..765b339
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@@ -66,7 -63,7 +66,7 @@@ public class WordCountTridentRedisClust
stream.partitionPersist(factory,
fields,
- new RedisClusterStateUpdater(storeMapper, 86400000),
- new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
++ new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
[09/12] storm git commit: STORM-753 Expose KeyFactory to public since
it doesn't have setter
Posted by bo...@apache.org.
STORM-753 Expose KeyFactory to public since it doesn't have setter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84fc7645
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84fc7645
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84fc7645
Branch: refs/heads/master
Commit: 84fc764560b50a133c5c902af9f7be9cd0e78a4d
Parents: f8906fb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:35:02 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:35:02 2015 +0900
----------------------------------------------------------------------
.../main/java/org/apache/storm/redis/trident/state/Options.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/84fc7645/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index b262c42..dffb713 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -27,7 +27,7 @@ public class Options<T> implements Serializable {
public int localCacheSize = 1000;
public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
- KeyFactory keyFactory = null;
+ public KeyFactory keyFactory = null;
public Serializer<T> serializer = null;
public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
}
[10/12] storm git commit: STORM-753 Correct README to reflect
RedisStoreMapper
Posted by bo...@apache.org.
STORM-753 Correct README to reflect RedisStoreMapper
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1eb85a22
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1eb85a22
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1eb85a22
Branch: refs/heads/master
Commit: 1eb85a2285a4d291ce60734bb00e5d0747986885
Parents: 84fc764
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:55:45 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:55:45 2015 +0900
----------------------------------------------------------------------
external/storm-redis/README.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1eb85a22/external/storm-redis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-redis/README.md b/external/storm-redis/README.md
index 86cd937..6e4063e 100644
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@ -193,7 +193,7 @@ RedisState
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
+ new RedisStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
@@ -220,7 +220,7 @@ RedisClusterState
stream.partitionPersist(factory,
fields,
- new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
+ new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
[08/12] storm git commit: STORM-753 Replace wildcard imports to each
single class imports
Posted by bo...@apache.org.
STORM-753 Replace wildcard imports to each single class imports
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f8906fb4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f8906fb4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f8906fb4
Branch: refs/heads/master
Commit: f8906fb4d7750d5f009fccf98943060bb25e2f0c
Parents: 037a975
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jun 2 06:33:58 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jun 2 06:33:58 2015 +0900
----------------------------------------------------------------------
.../redis/trident/state/AbstractRedisMapState.java | 13 +++++++++++--
.../redis/trident/state/RedisClusterMapState.java | 15 ++++++++++++---
.../storm/redis/trident/state/RedisMapState.java | 15 ++++++++++++---
3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
index 56b37b2..b6fc8d7 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -19,10 +19,19 @@ package org.apache.storm.redis.trident.state;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import storm.trident.state.*;
+import storm.trident.state.JSONNonTransactionalSerializer;
+import storm.trident.state.JSONOpaqueSerializer;
+import storm.trident.state.JSONTransactionalSerializer;
+import storm.trident.state.Serializer;
+import storm.trident.state.StateType;
import storm.trident.state.map.IBackingMap;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 10bf9ea..230f7f0 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -19,13 +19,22 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.JedisCluster;
-import storm.trident.state.*;
-import storm.trident.state.map.*;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
import java.util.List;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/storm/blob/f8906fb4/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 14b014e..85cbff2 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -19,13 +19,22 @@ package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
-import com.google.common.base.Strings;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-import storm.trident.state.*;
-import storm.trident.state.map.*;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
import java.util.List;
import java.util.Map;