You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:26:24 UTC
[11/23] storm git commit: change storm-redis trident state implements
change storm-redis trident state implements
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f9438fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f9438fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f9438fc
Branch: refs/heads/master
Commit: 0f9438fc5491fb3764849b31536352e69deae8ea
Parents: ff36ac8
Author: dashengju <da...@qq.com>
Authored: Sun Jan 4 16:31:40 2015 +0800
Committer: dashengju <da...@qq.com>
Committed: Sun Jan 4 16:31:40 2015 +0800
----------------------------------------------------------------------
.../trident/state/RedisClusterMapState.java | 293 +++++++++++++++++++
.../redis/trident/state/RedisClusterState.java | 81 +++++
.../trident/state/RedisClusterStateQuerier.java | 81 +++++
.../trident/state/RedisClusterStateUpdater.java | 76 +++++
.../redis/trident/state/RedisMapState.java | 192 +++---------
.../storm/redis/trident/state/RedisState.java | 48 ++-
.../redis/trident/state/RedisStateQuerier.java | 35 +--
.../state/RedisStateSetCountQuerier.java | 12 +-
.../trident/state/RedisStateSetUpdater.java | 16 +-
.../redis/trident/state/RedisStateUpdater.java | 14 +-
.../trident/WordCountTridentRedisCluster.java | 13 +-
.../WordCountTridentRedisClusterMap.java | 101 +++++++
12 files changed, 735 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
new file mode 100644
index 0000000..6324126
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.tuple.Values;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.state.JSONNonTransactionalSerializer;
+import storm.trident.state.JSONOpaqueSerializer;
+import storm.trident.state.JSONTransactionalSerializer;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.IBackingMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+public class RedisClusterMapState<T> implements IBackingMap<T> {
+ private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
+
+ private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+ StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+ StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+ StateType.OPAQUE, new JSONOpaqueSerializer()
+ ));
+
+ public static class DefaultKeyFactory implements KeyFactory {
+ public String build(List<Object> key) {
+ if (key.size() != 1)
+ throw new RuntimeException("Default KeyFactory does not support compound keys");
+ return (String) key.get(0);
+ }
+ };
+
+ public static class Options<T> implements Serializable {
+ public int localCacheSize = 1000;
+ public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+ KeyFactory keyFactory = null;
+ public Serializer<T> serializer = null;
+ public String hkey = null;
+ }
+
+ public static interface KeyFactory extends Serializable {
+ String build(List<Object> key);
+ }
+
+ /**
+ * OpaqueTransactional for redis-cluster.
+ * */
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig) {
+ return opaque(jedisClusterConfig, new Options());
+ }
+
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return opaque(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return opaque(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, Options<OpaqueValue> opts) {
+ return new Factory(jedisClusterConfig, StateType.OPAQUE, opts);
+ }
+
+ /**
+ * Transactional for redis-cluster.
+ * */
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig) {
+ return transactional(jedisClusterConfig, new Options());
+ }
+
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return transactional(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return transactional(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, Options<TransactionalValue> opts) {
+ return new Factory(jedisClusterConfig, StateType.TRANSACTIONAL, opts);
+ }
+
+ /**
+ * NonTransactional for redis-cluster.
+ * */
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig) {
+ return nonTransactional(jedisClusterConfig, new Options());
+ }
+
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+ Options opts = new Options();
+ opts.hkey = hkey;
+ return nonTransactional(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+ Options opts = new Options();
+ opts.keyFactory = factory;
+ return nonTransactional(jedisClusterConfig, opts);
+ }
+
+ public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, Options<Object> opts) {
+ return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
+ }
+
+
+
+ protected static class Factory implements StateFactory {
+ // TODO : serialize redis.clients.jedis.JedisPoolConfig
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+ JedisClusterConfig jedisClusterConfig;
+
+ StateType type;
+ Serializer serializer;
+ KeyFactory keyFactory;
+ Options options;
+
+ public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) {
+ this.jedisClusterConfig = jedisClusterConfig;
+ this.type = type;
+ this.options = options;
+
+ this.keyFactory = options.keyFactory;
+ if (this.keyFactory == null) {
+ this.keyFactory = new DefaultKeyFactory();
+ }
+ this.serializer = options.serializer;
+ if (this.serializer == null) {
+ this.serializer = DEFAULT_SERIALIZERS.get(type);
+ if (this.serializer == null) {
+ throw new RuntimeException("Couldn't find serializer for state type: " + type);
+ }
+ }
+ }
+
+ public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ DEFAULT_POOL_CONFIG);
+
+ RedisClusterMapState state = new RedisClusterMapState(jedisCluster, options, serializer, keyFactory);
+ CachedMap c = new CachedMap(state, options.localCacheSize);
+
+ MapState ms;
+ if (type == StateType.NON_TRANSACTIONAL) {
+ ms = NonTransactionalMap.build(c);
+
+ } else if (type == StateType.OPAQUE) {
+ ms = OpaqueMap.build(c);
+
+ } else if (type == StateType.TRANSACTIONAL) {
+ ms = TransactionalMap.build(c);
+
+ } else {
+ throw new RuntimeException("Unknown state type: " + type);
+ }
+
+ return new SnapshottableMap(ms, new Values(options.globalKey));
+ }
+ }
+
+ private JedisCluster jedisCluster;
+ private Options options;
+ private Serializer serializer;
+ private KeyFactory keyFactory;
+
+ public RedisClusterMapState(JedisCluster jedisCluster, Options options,
+ Serializer<T> serializer, KeyFactory keyFactory) {
+ this.jedisCluster = jedisCluster;
+ this.options = options;
+ this.serializer = serializer;
+ this.keyFactory = keyFactory;
+ }
+
+ public List<T> multiGet(List<List<Object>> keys) {
+ if (keys.size() == 0) {
+ return Collections.emptyList();
+ }
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ String[] stringKeys = buildKeys(keys);
+ List<String> values = Lists.newArrayList();
+
+ for (String stringKey : stringKeys) {
+ String value = jedisCluster.get(stringKey);
+ values.add(value);
+ }
+
+ return deserializeValues(keys, values);
+ } else {
+ Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
+ List<String> values = buildValuesFromMap(keys, keyValue);
+ return deserializeValues(keys, values);
+ }
+ }
+
+ private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
+ List<String> values = new ArrayList<String>(keys.size());
+ for (List<Object> key : keys) {
+ String strKey = keyFactory.build(key);
+ String value = keyValue.get(strKey);
+ values.add(value);
+ }
+ return values;
+ }
+
+ private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+ List<T> result = new ArrayList<T>(keys.size());
+ for (String value : values) {
+ if (value != null) {
+ result.add((T) serializer.deserialize(value.getBytes()));
+ } else {
+ result.add(null);
+ }
+ }
+ return result;
+ }
+
+ private String[] buildKeys(List<List<Object>> keys) {
+ String[] stringKeys = new String[keys.size()];
+ int index = 0;
+ for (List<Object> key : keys)
+ stringKeys[index++] = keyFactory.build(key);
+ return stringKeys;
+ }
+
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
+ if (keys.size() == 0) {
+ return;
+ }
+
+ if (Strings.isNullOrEmpty(this.options.hkey)) {
+ for (int i = 0; i < keys.size(); i++) {
+ String val = new String(serializer.serialize(vals.get(i)));
+ String redisKey = keyFactory.build(keys.get(i));
+ jedisCluster.set(redisKey, val);
+ }
+ } else {
+ for (int i = 0; i < keys.size(); i++) {
+ String val = new String(serializer.serialize(vals.get(i)));
+ String redisKey = keyFactory.build(keys.get(i));
+ jedisCluster.hset(this.options.hkey, redisKey, val);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
new file mode 100644
index 0000000..8516bca
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class RedisClusterState implements State {
+ private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+ @Override
+ public void beginCommit(Long aLong) {
+ }
+
+ @Override
+ public void commit(Long aLong) {
+ }
+
+ public static class Factory implements StateFactory {
+ // TODO : serialize redis.clients.jedis.JedisPoolConfig
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+ private JedisClusterConfig jedisClusterConfig;
+
+ public Factory(JedisClusterConfig config) {
+ this.jedisClusterConfig = config;
+ }
+
+ public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ DEFAULT_POOL_CONFIG);
+
+ return new RedisClusterState(jedisCluster);
+ }
+ }
+
+ private JedisCluster jedisCluster;
+
+ public RedisClusterState(JedisCluster jedisCluster) {
+ this.jedisCluster = jedisCluster;
+ }
+
+ /**
+ * The state updater and querier can get a JedisCluster instance
+ * */
+ public JedisCluster getJedisCluster() {
+ return this.jedisCluster;
+ }
+
+ /**
+ * The state updater and querier return the JedisCluster instance
+ * */
+ public void returnJedisCluster(JedisCluster jedisCluster) {
+ //do nothing
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
new file mode 100644
index 0000000..3dc31d2
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ * Created by judasheng on 14-12-12.
+ */
+public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, String> {
+ private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+ private final String redisKeyPrefix;
+ private final TridentTupleMapper tupleMapper;
+
+ public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+ this.redisKeyPrefix = redisKeyPrefix;
+ this.tupleMapper = tupleMapper;
+ }
+
+ @Override
+ public List<String> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
+ List<String> ret = Lists.newArrayList();
+
+ List<String> keys = Lists.newArrayList();
+
+ JedisCluster jedisCluster = null;
+ try {
+ jedisCluster = redisClusterState.getJedisCluster();
+
+
+ for (TridentTuple input : inputs) {
+ String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+ key = redisKeyPrefix + key;
+ }
+ String value = jedisCluster.get(key);
+ ret.add(value);
+
+ logger.debug("redis get key[" + key + "] count[" + value + "]");
+ }
+ } finally {
+ if (jedisCluster != null) {
+ redisClusterState.returnJedisCluster(jedisCluster);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, String s, TridentCollector collector) {
+ String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+ collector.emit(new Values(key, s));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
new file mode 100644
index 0000000..8215b7f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
+ private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+ private static final long DEFAULT_EXPIRE_INTERVAL_MS = 86400000;
+
+ private final String redisKeyPrefix;
+ private final TridentTupleMapper tupleMapper;
+ private final long expireIntervalMs;
+
+ public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, long expireIntervalMs) {
+ this.redisKeyPrefix = redisKeyPrefix;
+ this.tupleMapper = tupleMapper;
+ if (expireIntervalMs > 0) {
+ this.expireIntervalMs = expireIntervalMs;
+ } else {
+ this.expireIntervalMs = DEFAULT_EXPIRE_INTERVAL_MS;
+ }
+ }
+
+ @Override
+ public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
+ TridentCollector collector) {
+ long expireAt = System.currentTimeMillis() + expireIntervalMs;
+
+ JedisCluster jedisCluster = null;
+ try {
+ jedisCluster = redisClusterState.getJedisCluster();
+ for (TridentTuple input : inputs) {
+ String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ String redisKey = key;
+ if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+ redisKey = redisKeyPrefix + redisKey;
+ }
+ String value = this.tupleMapper.getValueFromTridentTuple(input);
+
+ logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+
+ jedisCluster.set(redisKey, value);
+ jedisCluster.expireAt(redisKey, expireAt);
+ }
+ } finally {
+ if (jedisCluster != null) {
+ redisClusterState.returnJedisCluster(jedisCluster);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7aa9dc9..bb9fa9d 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,16 +21,13 @@ import backtype.storm.task.IMetricsContext;
import backtype.storm.tuple.Values;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
import storm.trident.state.JSONNonTransactionalSerializer;
import storm.trident.state.JSONOpaqueSerializer;
import storm.trident.state.JSONTransactionalSerializer;
@@ -108,29 +105,6 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
/**
- * OpaqueTransactional for redis-cluster.
- * */
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig) {
- return opaque(jedisClusterConfig, new Options());
- }
-
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
- Options opts = new Options();
- opts.hkey = hkey;
- return opaque(jedisClusterConfig, opts);
- }
-
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
- Options opts = new Options();
- opts.keyFactory = factory;
- return opaque(jedisClusterConfig, opts);
- }
-
- public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, Options<OpaqueValue> opts) {
- return new Factory(jedisClusterConfig, StateType.OPAQUE, opts);
- }
-
- /**
* Transactional for redis.
* */
public static StateFactory transactional(JedisPoolConfig jedisPoolConfig) {
@@ -154,29 +128,6 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
/**
- * Transactional for redis-cluster.
- * */
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig) {
- return transactional(jedisClusterConfig, new Options());
- }
-
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
- Options opts = new Options();
- opts.hkey = hkey;
- return transactional(jedisClusterConfig, opts);
- }
-
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
- Options opts = new Options();
- opts.keyFactory = factory;
- return transactional(jedisClusterConfig, opts);
- }
-
- public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, Options<TransactionalValue> opts) {
- return new Factory(jedisClusterConfig, StateType.TRANSACTIONAL, opts);
- }
-
- /**
* NonTransactional for redis.
* */
public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig) {
@@ -199,34 +150,11 @@ public class RedisMapState<T> implements IBackingMap<T> {
return new Factory(jedisPoolConfig, StateType.NON_TRANSACTIONAL, opts);
}
- /**
- * NonTransactional for redis-cluster.
- * */
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig) {
- return nonTransactional(jedisClusterConfig, new Options());
- }
-
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
- Options opts = new Options();
- opts.hkey = hkey;
- return nonTransactional(jedisClusterConfig, opts);
- }
-
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
- Options opts = new Options();
- opts.keyFactory = factory;
- return nonTransactional(jedisClusterConfig, opts);
- }
-
- public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, Options<Object> opts) {
- return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
- }
-
-
-
protected static class Factory implements StateFactory {
+ // TODO : serialize redis.clients.jedis.JedisPoolConfig
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
JedisPoolConfig jedisPoolConfig;
- JedisClusterConfig jedisClusterConfig;
StateType type;
Serializer serializer;
@@ -251,34 +179,14 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
}
- public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) {
- this.jedisClusterConfig = jedisClusterConfig;
- this.type = type;
- this.options = options;
-
- this.keyFactory = options.keyFactory;
- if (this.keyFactory == null) {
- this.keyFactory = new DefaultKeyFactory();
- }
- this.serializer = options.serializer;
- if (this.serializer == null) {
- this.serializer = DEFAULT_SERIALIZERS.get(type);
- if (this.serializer == null) {
- throw new RuntimeException("Couldn't find serializer for state type: " + type);
- }
- }
- }
-
public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- JedisCommandsInstanceContainer container;
- if (jedisPoolConfig != null) {
- container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
- } else if (jedisClusterConfig != null) {
- container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
- } else {
- throw new IllegalArgumentException("Jedis configuration not found");
- }
- RedisMapState state = new RedisMapState(container, options, serializer, keyFactory);
+ JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+ jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(),
+ jedisPoolConfig.getTimeout(),
+ jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
+ RedisMapState state = new RedisMapState(jedisPool, options, serializer, keyFactory);
CachedMap c = new CachedMap(state, options.localCacheSize);
MapState ms;
@@ -299,14 +207,14 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
}
- private transient JedisCommandsInstanceContainer container;
+ private JedisPool jedisPool;
private Options options;
private Serializer serializer;
private KeyFactory keyFactory;
- public RedisMapState(JedisCommandsInstanceContainer container, Options options,
+ public RedisMapState(JedisPool jedisPool, Options options,
Serializer<T> serializer, KeyFactory keyFactory) {
- this.container = container;
+ this.jedisPool = jedisPool;
this.options = options;
this.serializer = serializer;
this.keyFactory = keyFactory;
@@ -318,37 +226,26 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
if (Strings.isNullOrEmpty(this.options.hkey)) {
String[] stringKeys = buildKeys(keys);
- List<String> values = Lists.newArrayList();
-
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = container.getInstance();
- if (jedisCommands instanceof Jedis) {
- //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
- values = ((Jedis)jedisCommands).mget(stringKeys);
- } else {
- for (String stringKey : stringKeys) {
- String value = jedisCommands.get(stringKey);
- values.add(value);
- }
- }
+ jedis = jedisPool.getResource();
+ List<String> values = jedis.mget(stringKeys);
+ return deserializeValues(keys, values);
} finally {
- if (jedisCommands != null) {
- container.returnInstance(jedisCommands);
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
}
}
-
- return deserializeValues(keys, values);
} else {
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = container.getInstance();
- Map<String, String> keyValue = jedisCommands.hgetAll(this.options.hkey);
+ jedis = jedisPool.getResource();
+ Map<String, String> keyValue = jedis.hgetAll(this.options.hkey);
List<String> values = buildValuesFromMap(keys, keyValue);
return deserializeValues(keys, values);
} finally {
- if (jedisCommands != null) {
- container.returnInstance(jedisCommands);
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
}
}
}
@@ -390,38 +287,31 @@ public class RedisMapState<T> implements IBackingMap<T> {
}
if (Strings.isNullOrEmpty(this.options.hkey)) {
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = container.getInstance();
- if (jedisCommands instanceof Jedis) {
- //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
- String[] keyValue = buildKeyValuesList(keys, vals);
- ((Jedis)jedisCommands).mset(keyValue);
- } else {
- for (int i = 0; i < keys.size(); i++) {
- String val = new String(serializer.serialize(vals.get(i)));
- String redisKey = keyFactory.build(keys.get(i));
- jedisCommands.set(redisKey, val);
- }
- }
+ jedis = jedisPool.getResource();
+ String[] keyValue = buildKeyValuesList(keys, vals);
+ jedis.mset(keyValue);
} finally {
- if (jedisCommands != null) {
- container.returnInstance(jedisCommands);
+ if (jedis != null) {
+ jedisPool.returnResource(jedis);
}
}
} else {
- JedisCommands jedisCommands = null;
+ Jedis jedis = jedisPool.getResource();
try {
- jedisCommands = container.getInstance();
+ Pipeline pl = jedis.pipelined();
+ pl.multi();
+
for (int i = 0; i < keys.size(); i++) {
String val = new String(serializer.serialize(vals.get(i)));
- String redisKey = keyFactory.build(keys.get(i));
- jedisCommands.hset(this.options.hkey, redisKey, val);
+ pl.hset(this.options.hkey, keyFactory.build(keys.get(i)), val);
}
+
+ pl.exec();
+ pl.sync();
} finally {
- if (jedisCommands != null) {
- container.returnInstance(jedisCommands);
- }
+ jedisPool.returnResource(jedis);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index 47d2a28..cc5933f 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,13 +18,11 @@
package org.apache.storm.redis.trident.state;
import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
@@ -42,49 +40,45 @@ public class RedisState implements State {
}
public static class Factory implements StateFactory {
+ // TODO : serialize redis.clients.jedis.JedisPoolConfig
+ public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
private JedisPoolConfig jedisPoolConfig;
- private JedisClusterConfig jedisClusterConfig;
public Factory(JedisPoolConfig config) {
this.jedisPoolConfig = config;
}
- public Factory(JedisClusterConfig config) {
- this.jedisClusterConfig = config;
- }
-
public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- JedisCommandsInstanceContainer container;
- if (jedisPoolConfig != null) {
- container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
- } else if (jedisClusterConfig != null) {
- container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
- } else {
- throw new IllegalArgumentException("Jedis configuration not found");
- }
+ JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+ jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(),
+ jedisPoolConfig.getTimeout(),
+ jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
- return new RedisState(container);
+ return new RedisState(jedisPool);
}
}
- private transient JedisCommandsInstanceContainer container;
+ private JedisPool jedisPool;
- public RedisState(JedisCommandsInstanceContainer container) {
- this.container = container;
+ public RedisState(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
}
/**
- * The state updater and querier can get a JedisCommands instance
+ * The state updater and querier can get a Jedis instance
* */
- public JedisCommands getInstance() {
- return this.container.getInstance();
+ public Jedis getJedis() {
+ return this.jedisPool.getResource();
}
/**
- * The state updater and querier return the JedisCommands instance
+ * The state updater and querier return the Jedis instance
* */
- public void returnInstance(JedisCommands instance) {
- this.container.returnInstance(instance);
+ public void returnJedis(Jedis jedis) {
+ this.jedisPool.returnResource(jedis);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 1f6a090..051088e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -22,16 +22,13 @@ import com.google.common.collect.Lists;
import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseQueryFunction;
import storm.trident.tuple.TridentTuple;
import java.util.List;
-/**
- * Created by judasheng on 14-12-12.
- */
public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
@@ -45,28 +42,24 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
@Override
public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
- List<String> ret = Lists.newArrayList();
+ List<String> keys = Lists.newArrayList();
+ for (TridentTuple input : inputs) {
+ String key = this.tupleMapper.getKeyFromTridentTuple(input);
+ if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+ key = redisKeyPrefix + key;
+ }
+ keys.add(key);
+ }
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = redisState.getInstance();
- for (TridentTuple input : inputs) {
- String key = this.tupleMapper.getKeyFromTridentTuple(input);
- if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
- key = redisKeyPrefix + key;
- }
- String value = jedisCommands.get(key);
- ret.add(value);
-
- logger.debug("redis get key[" + key + "] count[" + value + "]");
- }
+ jedis = redisState.getJedis();
+ return jedis.mget(keys.toArray(new String[keys.size()]));
} finally {
- if (jedisCommands != null) {
- redisState.returnInstance(jedisCommands);
+ if (jedis != null) {
+ redisState.returnJedis(jedis);
}
}
-
- return ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 42420bc..5b04d59 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Values;
import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseQueryFunction;
import storm.trident.tuple.TridentTuple;
@@ -44,22 +44,22 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
public List<Long> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<Long> ret = new ArrayList<Long>();
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = redisState.getInstance();
+ jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
- long count = jedisCommands.scard(key);
+ long count = jedis.scard(key);
ret.add(count);
logger.debug("redis get key[" + key + "] count[" + count + "]");
}
} finally {
- if (jedisCommands != null) {
- redisState.returnInstance(jedisCommands);
+ if (jedis != null) {
+ redisState.returnJedis(jedis);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index f24caf5..2134d99 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Values;
import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseStateUpdater;
import storm.trident.tuple.TridentTuple;
@@ -52,9 +52,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
TridentCollector collector) {
long expireAt = System.currentTimeMillis() + expireIntervalMs;
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = redisState.getInstance();
+ jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
String redisKey = key;
@@ -65,15 +65,15 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
- jedisCommands.sadd(redisKey, value);
- jedisCommands.expireAt(redisKey, expireAt);
- Long count = jedisCommands.scard(redisKey);
+ jedis.sadd(redisKey, value);
+ jedis.expireAt(redisKey, expireAt);
+ Long count = jedis.scard(redisKey);
collector.emit(new Values(key, count));
}
} finally {
- if (jedisCommands != null) {
- redisState.returnInstance(jedisCommands);
+ if (jedis != null) {
+ redisState.returnJedis(jedis);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index a832995..6fdede5 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -20,7 +20,7 @@ package org.apache.storm.redis.trident.state;
import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseStateUpdater;
import storm.trident.tuple.TridentTuple;
@@ -51,9 +51,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
TridentCollector collector) {
long expireAt = System.currentTimeMillis() + expireIntervalMs;
- JedisCommands jedisCommands = null;
+ Jedis jedis = null;
try {
- jedisCommands = redisState.getInstance();
+ jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTridentTuple(input);
String redisKey = key;
@@ -64,12 +64,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
- jedisCommands.set(redisKey, value);
- jedisCommands.expireAt(redisKey, expireAt);
+ jedis.set(redisKey, value);
+ jedis.expireAt(redisKey, expireAt);
}
} finally {
- if (jedisCommands != null) {
- redisState.returnInstance(jedisCommands);
+ if (jedis != null) {
+ redisState.returnJedis(jedis);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index c4d55dc..31434dc 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -24,11 +24,10 @@ import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
import org.apache.storm.redis.util.config.JedisClusterConfig;
-import redis.clients.jedis.HostAndPort;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
@@ -57,19 +56,19 @@ public class WordCountTridentRedisCluster {
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
TridentTupleMapper tupleMapper = new WordCountTupleMapper();
- RedisState.Factory factory = new RedisState.Factory(clusterConfig);
+ RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory,
fields,
- new RedisStateUpdater("test_", tupleMapper, 86400000),
+ new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
new Fields());
TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
- new RedisStateQuerier("test_", tupleMapper),
+ new RedisClusterStateQuerier("test_", tupleMapper),
new Fields("columnName","columnValue"));
stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
return topology.build();
http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
new file mode 100644
index 0000000..e9ae54d
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisClusterMapState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisClusterMap {
+ public static StormTopology buildTopology(String redisHostPort){
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", 1),
+ new Values("trident", 1),
+ new Values("needs", 1),
+ new Values("javadoc", 1)
+ );
+ spout.setCycle(true);
+
+ Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+ for (String hostPort : redisHostPort.split(",")) {
+ String[] host_port = hostPort.split(":");
+ nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+ }
+ JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+ .build();
+ TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+ StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentState state = stream.groupBy(new Fields("word"))
+ .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+ stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+ .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 2) {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+ System.exit(1);
+ }
+
+ Integer flag = Integer.valueOf(args[0]);
+ String redisHostPort = args[1];
+
+ Config conf = new Config();
+ conf.setMaxSpoutPending(5);
+ if (flag == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ Thread.sleep(60 * 1000);
+ cluster.killTopology("test_wordCounter_for_redis");
+ cluster.shutdown();
+ System.exit(0);
+ } else if(flag == 1) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+ } else {
+ System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+ }
+ }
+
+}