You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:35 UTC

[07/12] storm git commit: Merge branch 'master' into STORM-753

Merge branch 'master' into STORM-753

Conflicts:
	external/storm-redis/README.md
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
	external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
	external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/037a9754
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/037a9754
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/037a9754

Branch: refs/heads/master
Commit: 037a9754fdee063d2238a9c41e6ffdb79759d33d
Parents: 98704ec 0c2b3a4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat May 30 22:34:19 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat May 30 22:34:19 2015 +0900

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .travis.yml                                     |   9 +
 CHANGELOG.md                                    |  48 +-
 LICENSE                                         |  29 +
 README.markdown                                 |   7 +-
 SECURITY.md                                     |  50 ++
 STORM-UI-REST-API.md                            |  44 +-
 bin/storm                                       |   2 +-
 bin/storm.cmd                                   |   4 +-
 bin/storm.py                                    |  24 +-
 conf/defaults.yaml                              |   3 +-
 dev-tools/test-ns.py                            |  19 +-
 .../print-errors-from-clojure-test-reports.py   |  58 ++
 dev-tools/travis/travis-build.sh                |  50 ++
 docs/documentation/Metrics.md                   |   2 +-
 docs/documentation/Multilang-protocol.md        |  63 +-
 .../documentation/Setting-up-a-Storm-cluster.md |   7 +-
 docs/documentation/Trident-API-Overview.md      |   2 +-
 examples/storm-starter/README.markdown          |   8 +-
 .../storm-starter/multilang/resources/storm.js  | 373 ---------
 .../storm-starter/multilang/resources/storm.py  | 260 -------
 .../storm-starter/multilang/resources/storm.rb  | 236 ------
 examples/storm-starter/pom.xml                  |  67 +-
 external/storm-eventhubs/README.md              |  41 +
 external/storm-eventhubs/pom.xml                | 122 +++
 .../storm/eventhubs/bolt/EventHubBolt.java      |  81 ++
 .../client/ConnectionStringBuilder.java         | 116 +++
 .../storm/eventhubs/client/Constants.java       |  32 +
 .../storm/eventhubs/client/EventHubClient.java  |  92 +++
 .../eventhubs/client/EventHubConsumerGroup.java |  72 ++
 .../eventhubs/client/EventHubException.java     |  37 +
 .../eventhubs/client/EventHubReceiver.java      | 139 ++++
 .../eventhubs/client/EventHubSendClient.java    |  70 ++
 .../storm/eventhubs/client/EventHubSender.java  |  95 +++
 .../storm/eventhubs/client/SelectorFilter.java  |  38 +
 .../eventhubs/client/SelectorFilterWriter.java  |  64 ++
 .../eventhubs/samples/AtMostOnceEventCount.java |  54 ++
 .../storm/eventhubs/samples/EventCount.java     | 155 ++++
 .../storm/eventhubs/samples/EventHubLoop.java   |  51 ++
 .../samples/OpaqueTridentEventCount.java        |  53 ++
 .../samples/TransactionalTridentEventCount.java |  81 ++
 .../eventhubs/samples/bolt/GlobalCountBolt.java |  83 ++
 .../samples/bolt/PartialCountBolt.java          |  63 ++
 .../apache/storm/eventhubs/spout/EventData.java |  48 ++
 .../storm/eventhubs/spout/EventDataScheme.java  |  55 ++
 .../eventhubs/spout/EventHubReceiverFilter.java |  56 ++
 .../eventhubs/spout/EventHubReceiverImpl.java   | 150 ++++
 .../storm/eventhubs/spout/EventHubSpout.java    | 258 +++++++
 .../eventhubs/spout/EventHubSpoutConfig.java    | 165 ++++
 .../eventhubs/spout/EventHubSpoutException.java |  37 +
 .../storm/eventhubs/spout/FieldConstants.java   |  25 +
 .../storm/eventhubs/spout/IEventDataScheme.java |  30 +
 .../eventhubs/spout/IEventHubReceiver.java      |  35 +
 .../spout/IEventHubReceiverFactory.java         |  30 +
 .../spout/IEventHubReceiverFilter.java          |  35 +
 .../eventhubs/spout/IPartitionCoordinator.java  |  27 +
 .../eventhubs/spout/IPartitionManager.java      |  37 +
 .../spout/IPartitionManagerFactory.java         |  33 +
 .../storm/eventhubs/spout/IStateStore.java      |  31 +
 .../apache/storm/eventhubs/spout/MessageId.java |  56 ++
 .../storm/eventhubs/spout/PartitionManager.java | 101 +++
 .../eventhubs/spout/SimplePartitionManager.java | 136 ++++
 .../spout/StaticPartitionCoordinator.java       |  85 +++
 .../eventhubs/spout/ZookeeperStateStore.java    |  95 +++
 .../storm/eventhubs/trident/Coordinator.java    |  60 ++
 .../trident/ITridentPartitionManager.java       |  35 +
 .../ITridentPartitionManagerFactory.java        |  26 +
 .../trident/OpaqueTridentEventHubEmitter.java   |  69 ++
 .../trident/OpaqueTridentEventHubSpout.java     |  64 ++
 .../storm/eventhubs/trident/Partition.java      |  39 +
 .../storm/eventhubs/trident/Partitions.java     |  41 +
 .../TransactionalTridentEventHubEmitter.java    | 167 ++++
 .../TransactionalTridentEventHubSpout.java      |  66 ++
 .../trident/TridentPartitionManager.java        |  91 +++
 .../src/main/resources/config.properties        |  27 +
 .../eventhubs/spout/EventHubReceiverMock.java   | 105 +++
 .../spout/EventHubSpoutCallerMock.java          |  96 +++
 .../spout/PartitionManagerCallerMock.java       | 105 +++
 .../spout/SpoutOutputCollectorMock.java         |  61 ++
 .../storm/eventhubs/spout/StateStoreMock.java   |  54 ++
 .../storm/eventhubs/spout/TestEventData.java    |  47 ++
 .../eventhubs/spout/TestEventHubSpout.java      |  70 ++
 .../eventhubs/spout/TestPartitionManager.java   | 117 +++
 .../TestTransactionalTridentEmitter.java        |  93 +++
 .../eventhubs/trident/TridentCollectorMock.java |  52 ++
 .../mapper/SimpleTridentHBaseMapMapper.java     |  50 ++
 .../trident/mapper/TridentHBaseMapMapper.java   |  40 +
 .../hbase/trident/state/HBaseMapState.java      |  45 +-
 .../hdfs/common/security/HdfsSecurityUtil.java  |   5 +-
 external/storm-hive/pom.xml                     |  21 +
 external/storm-jdbc/README.md                   |  72 +-
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  17 +-
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |   5 +-
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |   5 +-
 .../org/apache/storm/jdbc/common/Column.java    |   7 +-
 .../storm/jdbc/common/ConnectionProvider.java   |  26 +
 .../jdbc/common/HikariCPConnectionProvider.java |  46 ++
 .../apache/storm/jdbc/common/JdbcClient.java    |  19 +-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |   6 +-
 .../storm/jdbc/trident/state/JdbcState.java     |  13 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   5 +-
 .../jdbc/topology/AbstractUserTopology.java     |  17 +-
 .../jdbc/topology/UserPersistanceTopology.java  |  18 +-
 .../UserPersistanceTridentTopology.java         |   2 +-
 external/storm-kafka/README.md                  |  52 +-
 .../jvm/storm/kafka/DynamicBrokersReader.java   |  26 +
 .../src/jvm/storm/kafka/KafkaConfig.java        |   2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |   5 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   4 +-
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |   1 +
 .../kafka/trident/TridentKafkaEmitter.java      |   4 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  13 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   6 +-
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  27 +
 external/storm-redis/README.md                  |   4 +-
 external/storm-redis/pom.xml                    |   2 +-
 .../redis/common/container/JedisContainer.java  |   7 +-
 .../state/AbstractRedisStateUpdater.java        |   6 +-
 .../trident/state/RedisClusterStateUpdater.java |   9 +-
 .../redis/trident/state/RedisStateUpdater.java  |   9 +-
 .../redis/trident/WordCountTridentRedis.java    |   2 +-
 .../trident/WordCountTridentRedisCluster.java   |   2 +-
 pom.xml                                         |   8 +-
 storm-core/pom.xml                              |  40 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   5 +-
 storm-core/src/clj/backtype/storm/converter.clj |  10 +-
 .../src/clj/backtype/storm/daemon/common.clj    |  15 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |  21 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   9 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   1 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   9 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  58 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  35 +-
 .../src/clj/backtype/storm/local_state.clj      |  99 +++
 .../src/clj/backtype/storm/messaging/loader.clj |  13 +-
 storm-core/src/clj/backtype/storm/testing.clj   |  25 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  42 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |  51 +-
 storm-core/src/clj/backtype/storm/util.clj      |  37 +-
 storm-core/src/dev/resources/storm.js           | 373 ---------
 storm-core/src/dev/resources/storm.py           | 260 -------
 storm-core/src/dev/resources/storm.rb           | 236 ------
 storm-core/src/jvm/backtype/storm/Config.java   | 104 ++-
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |   8 +-
 .../storm/generated/ClusterWorkerHeartbeat.java | 102 ++-
 .../storm/generated/LSApprovedWorkers.java      | 458 +++++++++++
 .../generated/LSSupervisorAssignments.java      | 471 ++++++++++++
 .../storm/generated/LSSupervisorId.java         | 406 ++++++++++
 .../storm/generated/LSWorkerHeartbeat.java      | 755 +++++++++++++++++++
 .../storm/generated/LocalAssignment.java        | 561 ++++++++++++++
 .../storm/generated/LocalStateData.java         | 471 ++++++++++++
 .../jvm/backtype/storm/generated/Nimbus.java    |  12 +-
 .../storm/generated/SupervisorInfo.java         | 116 ++-
 .../storm/generated/SupervisorSummary.java      | 117 ++-
 .../storm/generated/ThriftSerializedObject.java | 516 +++++++++++++
 .../storm/generated/TopologySummary.java        |   2 +-
 .../backtype/storm/messaging/netty/Client.java  |   4 +-
 .../security/auth/SaslTransportPlugin.java      |  17 +-
 .../GzipBridgeThriftSerializationDelegate.java  |  64 ++
 .../GzipThriftSerializationDelegate.java        |  57 ++
 .../jvm/backtype/storm/spout/ShellSpout.java    |   6 +
 .../src/jvm/backtype/storm/task/IBolt.java      |   4 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |   5 +-
 .../backtype/storm/task/TopologyContext.java    | 104 ++-
 .../storm/utils/ExtendedThreadPoolExecutor.java |  67 ++
 .../jvm/backtype/storm/utils/LocalState.java    | 163 +++-
 .../backtype/storm/utils/TransferDrainer.java   |  62 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  58 +-
 storm-core/src/multilang/js/storm.js            | 366 ---------
 storm-core/src/multilang/py/storm.py            | 260 -------
 storm-core/src/multilang/rb/storm.rb            | 236 ------
 storm-core/src/py/__init__.py                   |   2 +
 storm-core/src/py/storm/DistributedRPC.py       |   2 +
 .../src/py/storm/DistributedRPCInvocations.py   |   2 +
 storm-core/src/py/storm/Nimbus.py               |  10 +
 storm-core/src/py/storm/__init__.py             |   2 +
 storm-core/src/py/storm/constants.py            |   2 +
 storm-core/src/py/storm/ttypes.py               | 645 +++++++++++++++-
 storm-core/src/storm.thrift                     |  43 ++
 .../src/ui/public/css/jsonFormatter.min.css     |   1 +
 storm-core/src/ui/public/css/style.css          |  15 +-
 storm-core/src/ui/public/index.html             |   3 +
 .../src/ui/public/js/jsonFormatter.min.js       |   2 +
 storm-core/src/ui/public/js/script.js           |   5 +-
 .../public/templates/anti-forgery-template.html |  19 -
 .../templates/component-page-template.html      |   8 +-
 .../public/templates/index-page-template.html   |   6 +
 .../templates/topology-page-template.html       |   4 +-
 storm-core/src/ui/public/topology.html          |  11 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   4 +-
 .../clj/backtype/storm/local_state_test.clj     |  40 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   2 +-
 .../clj/backtype/storm/transactional_test.clj   |   6 +-
 ...ipBridgeThriftSerializationDelegateTest.java |  71 ++
 .../storm/utils/DisruptorQueueTest.java         |  38 +-
 storm-core/test/resources/logback-test.xml      |  26 +
 storm-dist/binary/src/main/assembly/binary.xml  |  41 +-
 storm-multilang/javascript/pom.xml              |  32 +
 .../src/main/resources/resources/storm.js       | 373 +++++++++
 storm-multilang/python/pom.xml                  |  32 +
 .../src/main/resources/resources/storm.py       | 260 +++++++
 storm-multilang/ruby/pom.xml                    |  32 +
 .../ruby/src/main/resources/resources/storm.rb  | 236 ++++++
 203 files changed, 12236 insertions(+), 3173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/README.md
----------------------------------------------------------------------
diff --cc external/storm-redis/README.md
index aa7a874,f165c09..86cd937
--- a/external/storm-redis/README.md
+++ b/external/storm-redis/README.md
@@@ -193,13 -192,8 +193,13 @@@ RedisStat
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisStateUpdater(storeMapper, 86400000),
+                                 new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
                                  new Fields());
 +
 +        TridentState state = topology.newStaticState(factory);
 +        stream = stream.stateQuery(state, new Fields("word"),
 +                                new RedisStateQuerier(lookupMapper),
 +                                new Fields("columnName","columnValue"));
  ```
  
  RedisClusterState
@@@ -220,13 -213,8 +220,13 @@@
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisClusterStateUpdater(storeMapper, 86400000),
+                                 new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
                                  new Fields());
 +
 +        TridentState state = topology.newStaticState(factory);
 +        stream = stream.stateQuery(state, new Fields("word"),
 +                                new RedisClusterStateQuerier(lookupMapper),
 +                                new Fields("columnName","columnValue"));
  ```
  
  ## License

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
index 2f95341,0000000..87bb8fa
mode 100644,000000..100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@@ -1,67 -1,0 +1,69 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.redis.trident.state;
 +
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 +import storm.trident.operation.TridentCollector;
 +import storm.trident.state.BaseStateUpdater;
 +import storm.trident.state.State;
 +import storm.trident.tuple.TridentTuple;
 +
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
 +	private final RedisStoreMapper storeMapper;
 +
- 	protected final int expireIntervalSec;
++	protected int expireIntervalSec = 0;
 +	protected final RedisDataTypeDescription.RedisDataType dataType;
 +	protected final String additionalKey;
 +
- 	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
++	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper) {
 +		this.storeMapper = storeMapper;
 +		RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
 +		this.dataType = dataTypeDescription.getDataType();
 +		this.additionalKey = dataTypeDescription.getAdditionalKey();
++	}
 +
++	public void setExpireInterval(int expireIntervalSec) {
 +		if (expireIntervalSec > 0) {
 +			this.expireIntervalSec = expireIntervalSec;
 +		} else {
 +			this.expireIntervalSec = 0;
 +		}
 +	}
 +
 +	@Override
 +	public void updateState(T state, List<TridentTuple> inputs,
 +			TridentCollector collector) {
 +		Map<String, String> keyToValue = new HashMap<String, String>();
 +
 +		for (TridentTuple input : inputs) {
 +			String key = storeMapper.getKeyFromTuple(input);
 +			String value = storeMapper.getValueFromTuple(input);
 +
 +			keyToValue.put(key, value);
 +		}
 +
 +		updateStatesToRedis(state, keyToValue);
 +	}
 +
 +	protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 924b6b9,e00cfb6..17c5bfc
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@@ -17,15 -17,36 +17,20 @@@
   */
  package org.apache.storm.redis.trident.state;
  
 -import org.apache.storm.redis.common.mapper.TupleMapper;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
  import redis.clients.jedis.JedisCluster;
 -import storm.trident.operation.TridentCollector;
 -import storm.trident.state.BaseStateUpdater;
 -import storm.trident.tuple.TridentTuple;
  
 -import java.util.List;
 +import java.util.Map;
  
 -public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
 -    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
 -
 -    private final String redisKeyPrefix;
 -    private final TupleMapper tupleMapper;
 -    private int expireIntervalSec = 0;
 -
 -    public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
 -        this.redisKeyPrefix = redisKeyPrefix;
 -        this.tupleMapper = tupleMapper;
 +public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
-     public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-         super(storeMapper, expireIntervalSec);
++    public RedisClusterStateUpdater(RedisStoreMapper storeMapper) {
++        super(storeMapper);
+     }
+ 
+     public RedisClusterStateUpdater withExpire(int expireIntervalSec) {
 -        if (expireIntervalSec > 0) {
 -            this.expireIntervalSec = expireIntervalSec;
 -        } else {
 -            this.expireIntervalSec = 0;
 -        }
 -
++        setExpireInterval(expireIntervalSec);
+         return this;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 583fa32,2939d3d..babcb1d
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@@ -17,16 -17,36 +17,21 @@@
   */
  package org.apache.storm.redis.trident.state;
  
 -import org.apache.storm.redis.common.mapper.TupleMapper;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import org.apache.storm.redis.common.mapper.RedisStoreMapper;
  import redis.clients.jedis.Jedis;
 -import storm.trident.operation.TridentCollector;
 -import storm.trident.state.BaseStateUpdater;
 -import storm.trident.tuple.TridentTuple;
 +import redis.clients.jedis.Pipeline;
  
 -import java.util.List;
 +import java.util.Map;
  
 -public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
 -    private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
 -
 -    private final String redisKeyPrefix;
 -    private final TupleMapper tupleMapper;
 -    private int expireIntervalSec = 0;
 -
 -    public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper) {
 -        this.redisKeyPrefix = redisKeyPrefix;
 -        this.tupleMapper = tupleMapper;
 +public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
-     public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-         super(storeMapper, expireIntervalSec);
++    public RedisStateUpdater(RedisStoreMapper storeMapper) {
++        super(storeMapper);
+     }
+ 
+     public RedisStateUpdater withExpire(int expireIntervalSec) {
 -        if (expireIntervalSec > 0) {
 -            this.expireIntervalSec = expireIntervalSec;
 -        } else {
 -            this.expireIntervalSec = 0;
 -        }
 -
++        setExpireInterval(expireIntervalSec);
+         return this;
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 79bab5a,eb13399..4a4aae0
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@@ -58,7 -55,7 +58,7 @@@ public class WordCountTridentRedis 
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisStateUpdater(storeMapper, 86400000),
 -                                new RedisStateUpdater("test_", tupleMapper).withExpire(86400000),
++                                new RedisStateUpdater(storeMapper).withExpire(86400000),
                                  new Fields());
  
          TridentState state = topology.newStaticState(factory);

http://git-wip-us.apache.org/repos/asf/storm/blob/037a9754/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 280b273,8562e77..765b339
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@@ -66,7 -63,7 +66,7 @@@ public class WordCountTridentRedisClust
  
          stream.partitionPersist(factory,
                                  fields,
-                                 new RedisClusterStateUpdater(storeMapper, 86400000),
 -                                new RedisClusterStateUpdater("test_", tupleMapper).withExpire(86400000),
++                                new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
                                  new Fields());
  
          TridentState state = topology.newStaticState(factory);