You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/23 17:26:24 UTC

[11/23] storm git commit: change storm-redis trident state implements

change storm-redis trident state implements


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f9438fc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f9438fc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f9438fc

Branch: refs/heads/master
Commit: 0f9438fc5491fb3764849b31536352e69deae8ea
Parents: ff36ac8
Author: dashengju <da...@qq.com>
Authored: Sun Jan 4 16:31:40 2015 +0800
Committer: dashengju <da...@qq.com>
Committed: Sun Jan 4 16:31:40 2015 +0800

----------------------------------------------------------------------
 .../trident/state/RedisClusterMapState.java     | 293 +++++++++++++++++++
 .../redis/trident/state/RedisClusterState.java  |  81 +++++
 .../trident/state/RedisClusterStateQuerier.java |  81 +++++
 .../trident/state/RedisClusterStateUpdater.java |  76 +++++
 .../redis/trident/state/RedisMapState.java      | 192 +++---------
 .../storm/redis/trident/state/RedisState.java   |  48 ++-
 .../redis/trident/state/RedisStateQuerier.java  |  35 +--
 .../state/RedisStateSetCountQuerier.java        |  12 +-
 .../trident/state/RedisStateSetUpdater.java     |  16 +-
 .../redis/trident/state/RedisStateUpdater.java  |  14 +-
 .../trident/WordCountTridentRedisCluster.java   |  13 +-
 .../WordCountTridentRedisClusterMap.java        | 101 +++++++
 12 files changed, 735 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
new file mode 100644
index 0000000..6324126
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import backtype.storm.tuple.Values;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.state.JSONNonTransactionalSerializer;
+import storm.trident.state.JSONOpaqueSerializer;
+import storm.trident.state.JSONTransactionalSerializer;
+import storm.trident.state.OpaqueValue;
+import storm.trident.state.Serializer;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+import storm.trident.state.StateType;
+import storm.trident.state.TransactionalValue;
+import storm.trident.state.map.CachedMap;
+import storm.trident.state.map.IBackingMap;
+import storm.trident.state.map.MapState;
+import storm.trident.state.map.NonTransactionalMap;
+import storm.trident.state.map.OpaqueMap;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.state.map.TransactionalMap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+public class RedisClusterMapState<T> implements IBackingMap<T> {
+    private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
+
+    private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+            StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+            StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+            StateType.OPAQUE, new JSONOpaqueSerializer()
+    ));
+
+    public static class DefaultKeyFactory implements KeyFactory {
+        public String build(List<Object> key) {
+            if (key.size() != 1)
+                throw new RuntimeException("Default KeyFactory does not support compound keys");
+            return (String) key.get(0);
+        }
+    };
+
+    public static class Options<T> implements Serializable {
+        public int localCacheSize = 1000;
+        public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+        KeyFactory keyFactory = null;
+        public Serializer<T> serializer = null;
+        public String hkey = null;
+    }
+
+    public static interface KeyFactory extends Serializable {
+        String build(List<Object> key);
+    }
+
+    /**
+     * OpaqueTransactional for redis-cluster.
+     * */
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig) {
+        return opaque(jedisClusterConfig, new Options());
+    }
+
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
+        Options opts = new Options();
+        opts.hkey = hkey;
+        return opaque(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+        Options opts = new Options();
+        opts.keyFactory = factory;
+        return opaque(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, Options<OpaqueValue> opts) {
+        return new Factory(jedisClusterConfig, StateType.OPAQUE, opts);
+    }
+
+    /**
+     * Transactional for redis-cluster.
+     * */
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig) {
+        return transactional(jedisClusterConfig, new Options());
+    }
+
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+        Options opts = new Options();
+        opts.hkey = hkey;
+        return transactional(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+        Options opts = new Options();
+        opts.keyFactory = factory;
+        return transactional(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, Options<TransactionalValue> opts) {
+        return new Factory(jedisClusterConfig, StateType.TRANSACTIONAL, opts);
+    }
+
+    /**
+     * NonTransactional for redis-cluster.
+     * */
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig) {
+        return nonTransactional(jedisClusterConfig, new Options());
+    }
+
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
+        Options opts = new Options();
+        opts.hkey = hkey;
+        return nonTransactional(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
+        Options opts = new Options();
+        opts.keyFactory = factory;
+        return nonTransactional(jedisClusterConfig, opts);
+    }
+
+    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, Options<Object> opts) {
+        return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
+    }
+
+
+
+    protected static class Factory implements StateFactory {
+        // TODO : serialize redis.clients.jedis.JedisPoolConfig
+        public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+        JedisClusterConfig jedisClusterConfig;
+
+        StateType type;
+        Serializer serializer;
+        KeyFactory keyFactory;
+        Options options;
+
+        public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) {
+            this.jedisClusterConfig = jedisClusterConfig;
+            this.type = type;
+            this.options = options;
+
+            this.keyFactory = options.keyFactory;
+            if (this.keyFactory == null) {
+                this.keyFactory = new DefaultKeyFactory();
+            }
+            this.serializer = options.serializer;
+            if (this.serializer == null) {
+                this.serializer = DEFAULT_SERIALIZERS.get(type);
+                if (this.serializer == null) {
+                    throw new RuntimeException("Couldn't find serializer for state type: " + type);
+                }
+            }
+        }
+
+        public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+            JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+                                                            jedisClusterConfig.getTimeout(),
+                                                            jedisClusterConfig.getMaxRedirections(),
+                                                            DEFAULT_POOL_CONFIG);
+
+            RedisClusterMapState state = new RedisClusterMapState(jedisCluster, options, serializer, keyFactory);
+            CachedMap c = new CachedMap(state, options.localCacheSize);
+
+            MapState ms;
+            if (type == StateType.NON_TRANSACTIONAL) {
+                ms = NonTransactionalMap.build(c);
+
+            } else if (type == StateType.OPAQUE) {
+                ms = OpaqueMap.build(c);
+
+            } else if (type == StateType.TRANSACTIONAL) {
+                ms = TransactionalMap.build(c);
+
+            } else {
+                throw new RuntimeException("Unknown state type: " + type);
+            }
+
+            return new SnapshottableMap(ms, new Values(options.globalKey));
+        }
+    }
+
+    private JedisCluster jedisCluster;
+    private Options options;
+    private Serializer serializer;
+    private KeyFactory keyFactory;
+
+    public RedisClusterMapState(JedisCluster jedisCluster, Options options,
+                                Serializer<T> serializer, KeyFactory keyFactory) {
+        this.jedisCluster = jedisCluster;
+        this.options = options;
+        this.serializer = serializer;
+        this.keyFactory = keyFactory;
+    }
+
+    public List<T> multiGet(List<List<Object>> keys) {
+        if (keys.size() == 0) {
+            return Collections.emptyList();
+        }
+        if (Strings.isNullOrEmpty(this.options.hkey)) {
+            String[] stringKeys = buildKeys(keys);
+            List<String> values = Lists.newArrayList();
+
+            for (String stringKey : stringKeys) {
+                String value = jedisCluster.get(stringKey);
+                values.add(value);
+            }
+
+            return deserializeValues(keys, values);
+        } else {
+            Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
+            List<String> values = buildValuesFromMap(keys, keyValue);
+            return deserializeValues(keys, values);
+        }
+    }
+
+    private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
+        List<String> values = new ArrayList<String>(keys.size());
+        for (List<Object> key : keys) {
+            String strKey = keyFactory.build(key);
+            String value = keyValue.get(strKey);
+            values.add(value);
+        }
+        return values;
+    }
+
+    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+        List<T> result = new ArrayList<T>(keys.size());
+        for (String value : values) {
+            if (value != null) {
+                result.add((T) serializer.deserialize(value.getBytes()));
+            } else {
+                result.add(null);
+            }
+        }
+        return result;
+    }
+
+    private String[] buildKeys(List<List<Object>> keys) {
+        String[] stringKeys = new String[keys.size()];
+        int index = 0;
+        for (List<Object> key : keys)
+            stringKeys[index++] = keyFactory.build(key);
+        return stringKeys;
+    }
+
+    public void multiPut(List<List<Object>> keys, List<T> vals) {
+        if (keys.size() == 0) {
+            return;
+        }
+
+        if (Strings.isNullOrEmpty(this.options.hkey)) {
+            for (int i = 0; i < keys.size(); i++) {
+                String val = new String(serializer.serialize(vals.get(i)));
+                String redisKey = keyFactory.build(keys.get(i));
+                jedisCluster.set(redisKey, val);
+            }
+        } else {
+            for (int i = 0; i < keys.size(); i++) {
+                String val = new String(serializer.serialize(vals.get(i)));
+                String redisKey = keyFactory.build(keys.get(i));
+                jedisCluster.hset(this.options.hkey, redisKey, val);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
new file mode 100644
index 0000000..8516bca
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class RedisClusterState implements State {
+    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+    @Override
+    public void beginCommit(Long aLong) {
+    }
+
+    @Override
+    public void commit(Long aLong) {
+    }
+
+    public static class Factory implements StateFactory {
+        // TODO : serialize redis.clients.jedis.JedisPoolConfig
+        public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
+        private JedisClusterConfig jedisClusterConfig;
+
+        public Factory(JedisClusterConfig config) {
+            this.jedisClusterConfig = config;
+        }
+
+        public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+            JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
+                                                    jedisClusterConfig.getTimeout(),
+                                                    jedisClusterConfig.getMaxRedirections(),
+                                                    DEFAULT_POOL_CONFIG);
+
+            return new RedisClusterState(jedisCluster);
+        }
+    }
+
+    private JedisCluster jedisCluster;
+
+    public RedisClusterState(JedisCluster jedisCluster) {
+        this.jedisCluster = jedisCluster;
+    }
+
+    /**
+     * The state updater and querier can get a JedisCluster instance
+     * */
+    public JedisCluster getJedisCluster() {
+        return this.jedisCluster;
+    }
+
+    /**
+     * The state updater and querier return the JedisCluster instance
+     * */
+    public void returnJedisCluster(JedisCluster jedisCluster) {
+        //do nothing
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
new file mode 100644
index 0000000..3dc31d2
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
+ * Created by judasheng on 14-12-12.
+ */
+public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, String> {
+    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+    private final String redisKeyPrefix;
+    private final TridentTupleMapper tupleMapper;
+
+    public RedisClusterStateQuerier(String redisKeyPrefix, TridentTupleMapper tupleMapper) {
+        this.redisKeyPrefix = redisKeyPrefix;
+        this.tupleMapper = tupleMapper;
+    }
+
+    @Override
+    public List<String> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
+        List<String> ret = Lists.newArrayList();
+
+        List<String> keys = Lists.newArrayList();
+
+        JedisCluster jedisCluster = null;
+        try {
+            jedisCluster = redisClusterState.getJedisCluster();
+
+
+            for (TridentTuple input : inputs) {
+                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+                    key = redisKeyPrefix + key;
+                }
+                String value = jedisCluster.get(key);
+                ret.add(value);
+
+                logger.debug("redis get key[" + key + "] count[" + value + "]");
+            }
+        } finally {
+            if (jedisCluster != null) {
+                redisClusterState.returnJedisCluster(jedisCluster);
+            }
+        }
+
+        return ret;
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, String s, TridentCollector collector) {
+        String key = this.tupleMapper.getKeyFromTridentTuple(tuple);
+        collector.emit(new Values(key, s));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
new file mode 100644
index 0000000..8215b7f
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
+    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
+
+    private static final long DEFAULT_EXPIRE_INTERVAL_MS = 86400000;
+
+    private final String redisKeyPrefix;
+    private final TridentTupleMapper tupleMapper;
+    private final long expireIntervalMs;
+
+    public RedisClusterStateUpdater(String redisKeyPrefix, TridentTupleMapper tupleMapper, long expireIntervalMs) {
+        this.redisKeyPrefix = redisKeyPrefix;
+        this.tupleMapper = tupleMapper;
+        if (expireIntervalMs > 0) {
+            this.expireIntervalMs = expireIntervalMs;
+        } else {
+            this.expireIntervalMs = DEFAULT_EXPIRE_INTERVAL_MS;
+        }
+    }
+
+    @Override
+    public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
+                            TridentCollector collector) {
+        long expireAt = System.currentTimeMillis() + expireIntervalMs;
+
+        JedisCluster jedisCluster = null;
+        try {
+            jedisCluster = redisClusterState.getJedisCluster();
+            for (TridentTuple input : inputs) {
+                String key = this.tupleMapper.getKeyFromTridentTuple(input);
+                String redisKey = key;
+                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+                    redisKey = redisKeyPrefix + redisKey;
+                }
+                String value = this.tupleMapper.getValueFromTridentTuple(input);
+
+                logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
+
+                jedisCluster.set(redisKey, value);
+                jedisCluster.expireAt(redisKey, expireAt);
+            }
+        } finally {
+            if (jedisCluster != null) {
+                redisClusterState.returnJedisCluster(jedisCluster);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7aa9dc9..bb9fa9d 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -21,16 +21,13 @@ import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
 import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
 import storm.trident.state.JSONNonTransactionalSerializer;
 import storm.trident.state.JSONOpaqueSerializer;
 import storm.trident.state.JSONTransactionalSerializer;
@@ -108,29 +105,6 @@ public class RedisMapState<T> implements IBackingMap<T> {
     }
 
     /**
-     * OpaqueTransactional for redis-cluster.
-     * */
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig) {
-        return opaque(jedisClusterConfig, new Options());
-    }
-
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, String hkey) {
-        Options opts = new Options();
-        opts.hkey = hkey;
-        return opaque(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
-        Options opts = new Options();
-        opts.keyFactory = factory;
-        return opaque(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory opaque(JedisClusterConfig jedisClusterConfig, Options<OpaqueValue> opts) {
-        return new Factory(jedisClusterConfig, StateType.OPAQUE, opts);
-    }
-
-    /**
      * Transactional for redis.
      * */
     public static StateFactory transactional(JedisPoolConfig jedisPoolConfig) {
@@ -154,29 +128,6 @@ public class RedisMapState<T> implements IBackingMap<T> {
     }
 
     /**
-     * Transactional for redis-cluster.
-     * */
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig) {
-        return transactional(jedisClusterConfig, new Options());
-    }
-
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, String hkey) {
-        Options opts = new Options();
-        opts.hkey = hkey;
-        return transactional(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
-        Options opts = new Options();
-        opts.keyFactory = factory;
-        return transactional(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory transactional(JedisClusterConfig jedisClusterConfig, Options<TransactionalValue> opts) {
-        return new Factory(jedisClusterConfig, StateType.TRANSACTIONAL, opts);
-    }
-
-    /**
      * NonTransactional for redis.
      * */
     public static StateFactory nonTransactional(JedisPoolConfig jedisPoolConfig) {
@@ -199,34 +150,11 @@ public class RedisMapState<T> implements IBackingMap<T> {
         return new Factory(jedisPoolConfig, StateType.NON_TRANSACTIONAL, opts);
     }
 
-    /**
-     * NonTransactional for redis-cluster.
-     * */
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig) {
-        return nonTransactional(jedisClusterConfig, new Options());
-    }
-
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, String hkey) {
-        Options opts = new Options();
-        opts.hkey = hkey;
-        return nonTransactional(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, KeyFactory factory) {
-        Options opts = new Options();
-        opts.keyFactory = factory;
-        return nonTransactional(jedisClusterConfig, opts);
-    }
-
-    public static StateFactory nonTransactional(JedisClusterConfig jedisClusterConfig, Options<Object> opts) {
-        return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
-    }
-
-
-
     protected static class Factory implements StateFactory {
+        // TODO : serialize redis.clients.jedis.JedisPoolConfig
+        public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
         JedisPoolConfig jedisPoolConfig;
-        JedisClusterConfig jedisClusterConfig;
 
         StateType type;
         Serializer serializer;
@@ -251,34 +179,14 @@ public class RedisMapState<T> implements IBackingMap<T> {
             }
         }
 
-        public Factory(JedisClusterConfig jedisClusterConfig, StateType type, Options options) {
-            this.jedisClusterConfig = jedisClusterConfig;
-            this.type = type;
-            this.options = options;
-
-            this.keyFactory = options.keyFactory;
-            if (this.keyFactory == null) {
-                this.keyFactory = new DefaultKeyFactory();
-            }
-            this.serializer = options.serializer;
-            if (this.serializer == null) {
-                this.serializer = DEFAULT_SERIALIZERS.get(type);
-                if (this.serializer == null) {
-                    throw new RuntimeException("Couldn't find serializer for state type: " + type);
-                }
-            }
-        }
-
         public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            JedisCommandsInstanceContainer container;
-            if (jedisPoolConfig != null) {
-                container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
-            } else if (jedisClusterConfig != null) {
-                container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
-            } else {
-                throw new IllegalArgumentException("Jedis configuration not found");
-            }
-            RedisMapState state = new RedisMapState(container, options, serializer, keyFactory);
+            JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+                                                    jedisPoolConfig.getHost(),
+                                                    jedisPoolConfig.getPort(),
+                                                    jedisPoolConfig.getTimeout(),
+                                                    jedisPoolConfig.getPassword(),
+                                                    jedisPoolConfig.getDatabase());
+            RedisMapState state = new RedisMapState(jedisPool, options, serializer, keyFactory);
             CachedMap c = new CachedMap(state, options.localCacheSize);
 
             MapState ms;
@@ -299,14 +207,14 @@ public class RedisMapState<T> implements IBackingMap<T> {
         }
     }
 
-    private transient JedisCommandsInstanceContainer container;
+    private JedisPool jedisPool;
     private Options options;
     private Serializer serializer;
     private KeyFactory keyFactory;
 
-    public RedisMapState(JedisCommandsInstanceContainer container, Options options,
+    public RedisMapState(JedisPool jedisPool, Options options,
                                             Serializer<T> serializer, KeyFactory keyFactory) {
-        this.container = container;
+        this.jedisPool = jedisPool;
         this.options = options;
         this.serializer = serializer;
         this.keyFactory = keyFactory;
@@ -318,37 +226,26 @@ public class RedisMapState<T> implements IBackingMap<T> {
         }
         if (Strings.isNullOrEmpty(this.options.hkey)) {
             String[] stringKeys = buildKeys(keys);
-            List<String> values = Lists.newArrayList();
-
-            JedisCommands jedisCommands = null;
+            Jedis jedis = null;
             try {
-                jedisCommands = container.getInstance();
-                if (jedisCommands instanceof Jedis) {
-                    //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
-                    values = ((Jedis)jedisCommands).mget(stringKeys);
-                } else {
-                    for (String stringKey : stringKeys) {
-                        String value = jedisCommands.get(stringKey);
-                        values.add(value);
-                    }
-                }
+                jedis = jedisPool.getResource();
+                List<String> values = jedis.mget(stringKeys);
+                return deserializeValues(keys, values);
             } finally {
-                if (jedisCommands != null) {
-                    container.returnInstance(jedisCommands);
+                if (jedis != null) {
+                    jedisPool.returnResource(jedis);
                 }
             }
-
-            return deserializeValues(keys, values);
         } else {
-            JedisCommands jedisCommands = null;
+            Jedis jedis = null;
             try {
-                jedisCommands = container.getInstance();
-                Map<String, String> keyValue = jedisCommands.hgetAll(this.options.hkey);
+                jedis = jedisPool.getResource();
+                Map<String, String> keyValue = jedis.hgetAll(this.options.hkey);
                 List<String> values = buildValuesFromMap(keys, keyValue);
                 return deserializeValues(keys, values);
             } finally {
-                if (jedisCommands != null) {
-                    container.returnInstance(jedisCommands);
+                if (jedis != null) {
+                    jedisPool.returnResource(jedis);
                 }
             }
         }
@@ -390,38 +287,31 @@ public class RedisMapState<T> implements IBackingMap<T> {
         }
 
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            JedisCommands jedisCommands = null;
+            Jedis jedis = null;
             try {
-                jedisCommands = container.getInstance();
-                if (jedisCommands instanceof Jedis) {
-                    //Todo because jedisCommands not support mget, we use Jedis for mget if it is Jedis
-                    String[] keyValue = buildKeyValuesList(keys, vals);
-                    ((Jedis)jedisCommands).mset(keyValue);
-                } else {
-                    for (int i = 0; i < keys.size(); i++) {
-                        String val = new String(serializer.serialize(vals.get(i)));
-                        String redisKey = keyFactory.build(keys.get(i));
-                        jedisCommands.set(redisKey, val);
-                    }
-                }
+                jedis = jedisPool.getResource();
+                String[] keyValue = buildKeyValuesList(keys, vals);
+                jedis.mset(keyValue);
             } finally {
-                if (jedisCommands != null) {
-                    container.returnInstance(jedisCommands);
+                if (jedis != null) {
+                    jedisPool.returnResource(jedis);
                 }
             }
         } else {
-            JedisCommands jedisCommands = null;
+            Jedis jedis = jedisPool.getResource();
             try {
-                jedisCommands = container.getInstance();
+                Pipeline pl = jedis.pipelined();
+                pl.multi();
+
                 for (int i = 0; i < keys.size(); i++) {
                     String val = new String(serializer.serialize(vals.get(i)));
-                    String redisKey = keyFactory.build(keys.get(i));
-                    jedisCommands.hset(this.options.hkey, redisKey, val);
+                    pl.hset(this.options.hkey, keyFactory.build(keys.get(i)), val);
                 }
+
+                pl.exec();
+                pl.sync();
             } finally {
-                if (jedisCommands != null) {
-                    container.returnInstance(jedisCommands);
-                }
+                jedisPool.returnResource(jedis);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index 47d2a28..cc5933f 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -18,13 +18,11 @@
 package org.apache.storm.redis.trident.state;
 
 import backtype.storm.task.IMetricsContext;
-import org.apache.storm.redis.util.config.JedisClusterConfig;
 import org.apache.storm.redis.util.config.JedisPoolConfig;
-import org.apache.storm.redis.util.container.JedisCommandsContainerBuilder;
-import org.apache.storm.redis.util.container.JedisCommandsInstanceContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
 import storm.trident.state.State;
 import storm.trident.state.StateFactory;
 
@@ -42,49 +40,45 @@ public class RedisState implements State {
     }
 
     public static class Factory implements StateFactory {
+        // TODO : serialize redis.clients.jedis.JedisPoolConfig
+        public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
+
         private JedisPoolConfig jedisPoolConfig;
-        private JedisClusterConfig jedisClusterConfig;
 
         public Factory(JedisPoolConfig config) {
             this.jedisPoolConfig = config;
         }
 
-        public Factory(JedisClusterConfig config) {
-            this.jedisClusterConfig = config;
-        }
-
         public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
-            JedisCommandsInstanceContainer container;
-            if (jedisPoolConfig != null) {
-                container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
-            } else if (jedisClusterConfig != null) {
-                container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
-            } else {
-                throw new IllegalArgumentException("Jedis configuration not found");
-            }
+            JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG,
+                                                jedisPoolConfig.getHost(),
+                                                jedisPoolConfig.getPort(),
+                                                jedisPoolConfig.getTimeout(),
+                                                jedisPoolConfig.getPassword(),
+                                                jedisPoolConfig.getDatabase());
 
-            return new RedisState(container);
+            return new RedisState(jedisPool);
         }
     }
 
-    private transient JedisCommandsInstanceContainer container;
+    private JedisPool jedisPool;
 
-    public RedisState(JedisCommandsInstanceContainer container) {
-        this.container = container;
+    public RedisState(JedisPool jedisPool) {
+        this.jedisPool = jedisPool;
     }
 
     /**
-     * The state updater and querier can get a JedisCommands instance
+     * The state updater and querier can get a Jedis instance
      * */
-    public JedisCommands getInstance() {
-        return this.container.getInstance();
+    public Jedis getJedis() {
+        return this.jedisPool.getResource();
     }
 
     /**
-     * The state updater and querier return the JedisCommands instance
+     * The state updater and querier return the Jedis instance
      * */
-    public void returnInstance(JedisCommands instance) {
-        this.container.returnInstance(instance);
+    public void returnJedis(Jedis jedis) {
+        this.jedisPool.returnResource(jedis);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index 1f6a090..051088e 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -22,16 +22,13 @@ import com.google.common.collect.Lists;
 import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
 import storm.trident.operation.TridentCollector;
 import storm.trident.state.BaseQueryFunction;
 import storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 
-/**
- * Created by judasheng on 14-12-12.
- */
 public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
     private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 
@@ -45,28 +42,24 @@ public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
 
     @Override
     public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
-        List<String> ret = Lists.newArrayList();
+        List<String> keys = Lists.newArrayList();
+        for (TridentTuple input : inputs) {
+            String key = this.tupleMapper.getKeyFromTridentTuple(input);
+            if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
+                key = redisKeyPrefix + key;
+            }
+            keys.add(key);
+        }
 
-        JedisCommands jedisCommands = null;
+        Jedis jedis = null;
         try {
-            jedisCommands = redisState.getInstance();
-            for (TridentTuple input : inputs) {
-                String key = this.tupleMapper.getKeyFromTridentTuple(input);
-                if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
-                    key = redisKeyPrefix + key;
-                }
-                String value = jedisCommands.get(key);
-                ret.add(value);
-
-                logger.debug("redis get key[" + key + "] count[" + value + "]");
-            }
+            jedis = redisState.getJedis();
+            return jedis.mget(keys.toArray(new String[keys.size()]));
         } finally {
-            if (jedisCommands != null) {
-                redisState.returnInstance(jedisCommands);
+            if (jedis != null) {
+                redisState.returnJedis(jedis);
             }
         }
-
-        return ret;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
index 42420bc..5b04d59 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetCountQuerier.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
 import storm.trident.operation.TridentCollector;
 import storm.trident.state.BaseQueryFunction;
 import storm.trident.tuple.TridentTuple;
@@ -44,22 +44,22 @@ public class RedisStateSetCountQuerier extends BaseQueryFunction<RedisState, Lon
     public List<Long> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
         List<Long> ret = new ArrayList<Long>();
 
-        JedisCommands jedisCommands = null;
+        Jedis jedis = null;
         try {
-            jedisCommands = redisState.getInstance();
+            jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
                     key = redisKeyPrefix + key;
                 }
-                long count = jedisCommands.scard(key);
+                long count = jedis.scard(key);
                 ret.add(count);
 
                 logger.debug("redis get key[" + key + "] count[" + count + "]");
             }
         } finally {
-            if (jedisCommands != null) {
-                redisState.returnInstance(jedisCommands);
+            if (jedis != null) {
+                redisState.returnJedis(jedis);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
index f24caf5..2134d99 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateSetUpdater.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Values;
 import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
 import storm.trident.operation.TridentCollector;
 import storm.trident.state.BaseStateUpdater;
 import storm.trident.tuple.TridentTuple;
@@ -52,9 +52,9 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
                             TridentCollector collector) {
         long expireAt = System.currentTimeMillis() + expireIntervalMs;
 
-        JedisCommands jedisCommands = null;
+        Jedis jedis = null;
         try {
-            jedisCommands = redisState.getInstance();
+            jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 String redisKey = key;
@@ -65,15 +65,15 @@ public class RedisStateSetUpdater extends BaseStateUpdater<RedisState> {
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 
-                jedisCommands.sadd(redisKey, value);
-                jedisCommands.expireAt(redisKey, expireAt);
-                Long count = jedisCommands.scard(redisKey);
+                jedis.sadd(redisKey, value);
+                jedis.expireAt(redisKey, expireAt);
+                Long count = jedis.scard(redisKey);
 
                 collector.emit(new Values(key, count));
             }
         } finally {
-            if (jedisCommands != null) {
-                redisState.returnInstance(jedisCommands);
+            if (jedis != null) {
+                redisState.returnJedis(jedis);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index a832995..6fdede5 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -20,7 +20,7 @@ package org.apache.storm.redis.trident.state;
 import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
+import redis.clients.jedis.Jedis;
 import storm.trident.operation.TridentCollector;
 import storm.trident.state.BaseStateUpdater;
 import storm.trident.tuple.TridentTuple;
@@ -51,9 +51,9 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
                             TridentCollector collector) {
         long expireAt = System.currentTimeMillis() + expireIntervalMs;
 
-        JedisCommands jedisCommands = null;
+        Jedis jedis = null;
         try {
-            jedisCommands = redisState.getInstance();
+            jedis = redisState.getJedis();
             for (TridentTuple input : inputs) {
                 String key = this.tupleMapper.getKeyFromTridentTuple(input);
                 String redisKey = key;
@@ -64,12 +64,12 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
 
                 logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
 
-                jedisCommands.set(redisKey, value);
-                jedisCommands.expireAt(redisKey, expireAt);
+                jedis.set(redisKey, value);
+                jedis.expireAt(redisKey, expireAt);
             }
         } finally {
-            if (jedisCommands != null) {
-                redisState.returnInstance(jedisCommands);
+            if (jedis != null) {
+                redisState.returnJedis(jedis);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index c4d55dc..31434dc 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -24,11 +24,10 @@ import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
-import org.apache.storm.redis.trident.state.RedisState;
-import org.apache.storm.redis.trident.state.RedisStateQuerier;
-import org.apache.storm.redis.trident.state.RedisStateUpdater;
+import org.apache.storm.redis.trident.state.RedisClusterState;
+import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
 import org.apache.storm.redis.util.config.JedisClusterConfig;
-import redis.clients.jedis.HostAndPort;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
@@ -57,19 +56,19 @@ public class WordCountTridentRedisCluster {
         JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
                                         .build();
         TridentTupleMapper tupleMapper = new WordCountTupleMapper();
-        RedisState.Factory factory = new RedisState.Factory(clusterConfig);
+        RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
 
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
         stream.partitionPersist(factory,
                                 fields,
-                                new RedisStateUpdater("test_", tupleMapper, 86400000),
+                                new RedisClusterStateUpdater("test_", tupleMapper, 86400000),
                                 new Fields());
 
         TridentState state = topology.newStaticState(factory);
         stream = stream.stateQuery(state, new Fields("word"),
-                                new RedisStateQuerier("test_", tupleMapper),
+                                new RedisClusterStateQuerier("test_", tupleMapper),
                                 new Fields("columnName","columnValue"));
         stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
         return topology.build();

http://git-wip-us.apache.org/repos/asf/storm/blob/0f9438fc/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
new file mode 100644
index 0000000..e9ae54d
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.apache.storm.redis.trident.state.RedisClusterMapState;
+import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
+import org.apache.storm.redis.trident.state.RedisStateQuerier;
+import org.apache.storm.redis.util.config.JedisClusterConfig;
+import storm.trident.Stream;
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.state.StateFactory;
+import storm.trident.testing.FixedBatchSpout;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WordCountTridentRedisClusterMap {
+    public static StormTopology buildTopology(String redisHostPort){
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", 1),
+                new Values("trident", 1),
+                new Values("needs", 1),
+                new Values("javadoc", 1)
+        );
+        spout.setCycle(true);
+
+        Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
+        for (String hostPort : redisHostPort.split(",")) {
+            String[] host_port = hostPort.split(":");
+            nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
+        }
+        JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
+                                        .build();
+        TridentTupleMapper tupleMapper = new WordCountTupleMapper();
+        StateFactory factory = RedisClusterMapState.transactional(clusterConfig);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentState state = stream.groupBy(new Fields("word"))
+                .persistentAggregate(factory, new Fields("count"), new Sum(), new Fields("sum"));
+
+        stream.stateQuery(state, new Fields("word"), new MapGet(), new Fields("sum"))
+                .each(new Fields("word", "sum"), new PrintFunction(), new Fields());
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) 127.0.0.1:6379,127.0.0.1:6380");
+            System.exit(1);
+        }
+
+        Integer flag = Integer.valueOf(args[0]);
+        String redisHostPort = args[1];
+
+        Config conf = new Config();
+        conf.setMaxSpoutPending(5);
+        if (flag == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+            Thread.sleep(60 * 1000);
+            cluster.killTopology("test_wordCounter_for_redis");
+            cluster.shutdown();
+            System.exit(0);
+        } else if(flag == 1) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology("test_wordCounter_for_redis", conf, buildTopology(redisHostPort));
+        } else {
+            System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
+        }
+    }
+
+}