You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:30 UTC
[02/12] storm git commit: Redis*StateQuerier / Redis*StateUpdater now
support HASH type
Redis*StateQuerier / Redis*StateUpdater now support HASH type
* use Pipeline when available to gain performance
* extract abstract classes to reduce code duplication
** AbstractRedisStateQuerier, AbstractRedisStateUpdater
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8fd1f4b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8fd1f4b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8fd1f4b1
Branch: refs/heads/master
Commit: 8fd1f4b193b5d81b3036726268da39380e2b3b61
Parents: 7fec9a1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 12:21:57 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 12:21:57 2015 +0900
----------------------------------------------------------------------
.../state/AbstractRedisStateQuerier.java | 69 ++++++++++++++++++++
.../state/AbstractRedisStateUpdater.java | 67 +++++++++++++++++++
.../trident/state/RedisClusterStateQuerier.java | 53 ++++++---------
.../trident/state/RedisClusterStateUpdater.java | 66 ++++++++-----------
.../redis/trident/state/RedisStateQuerier.java | 54 +++++----------
.../redis/trident/state/RedisStateUpdater.java | 67 +++++++++----------
.../redis/trident/WordCountLookupMapper.java | 2 +-
.../redis/trident/WordCountStoreMapper.java | 2 +-
8 files changed, 231 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
new file mode 100644
index 0000000..24ecfc4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQueryFunction<T, List<Values>> {
+ private final RedisLookupMapper lookupMapper;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
+ public AbstractRedisStateQuerier(RedisLookupMapper lookupMapper) {
+ this.lookupMapper = lookupMapper;
+
+ RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+ }
+
+ @Override
+ public List<List<Values>> batchRetrieve(T state, List<TridentTuple> inputs) {
+ List<List<Values>> values = Lists.newArrayList();
+
+ List<String> keys = Lists.newArrayList();
+ for (TridentTuple input : inputs) {
+ keys.add(lookupMapper.getKeyFromTuple(input));
+ }
+
+ List<String> redisVals = retrieveValuesFromRedis(state, keys);
+ for (int i = 0 ; i < redisVals.size() ; i++) {
+ values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+ }
+
+ return values;
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+ for (Values value : values) {
+ collector.emit(value);
+ }
+ }
+
+ protected abstract List<String> retrieveValuesFromRedis(T redisClusterState, List<String> keys);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
new file mode 100644
index 0000000..2f95341
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
+ private final RedisStoreMapper storeMapper;
+
+ protected final int expireIntervalSec;
+ protected final RedisDataTypeDescription.RedisDataType dataType;
+ protected final String additionalKey;
+
+ public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+ this.storeMapper = storeMapper;
+ RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+ this.dataType = dataTypeDescription.getDataType();
+ this.additionalKey = dataTypeDescription.getAdditionalKey();
+
+ if (expireIntervalSec > 0) {
+ this.expireIntervalSec = expireIntervalSec;
+ } else {
+ this.expireIntervalSec = 0;
+ }
+ }
+
+ @Override
+ public void updateState(T state, List<TridentTuple> inputs,
+ TridentCollector collector) {
+ Map<String, String> keyToValue = new HashMap<String, String>();
+
+ for (TridentTuple input : inputs) {
+ String key = storeMapper.getKeyFromTuple(input);
+ String value = storeMapper.getValueFromTuple(input);
+
+ keyToValue.put(key, value);
+ }
+
+ updateStatesToRedis(state, keyToValue);
+ }
+
+ protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 4382fe3..66ff3f6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -17,57 +17,42 @@
*/
package org.apache.storm.redis.trident.state;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
+import java.util.ArrayList;
import java.util.List;
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
- private final RedisLookupMapper lookupMapper;
-
+public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClusterState> {
public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
+ super(lookupMapper);
}
@Override
- public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
- List<List<Values>> ret = Lists.newArrayList();
-
+ protected List<String> retrieveValuesFromRedis(RedisClusterState redisClusterState, List<String> keys) {
JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();
-
- for (int i = 0 ; i < inputs.size() ; i++) {
- TridentTuple input = inputs.get(i);
-
- String key = lookupMapper.getKeyFromTuple(input);
- String value = jedisCluster.get(key);
- ret.add(lookupMapper.toTuple(input, value));
- logger.debug("redis get key[" + key + "] value [" + value + "]");
+ List<String> redisVals = new ArrayList<String>();
+
+ for (String key : keys) {
+ switch (dataType) {
+ case STRING:
+ redisVals.add(jedisCluster.get(key));
+ break;
+ case HASH:
+ redisVals.add(jedisCluster.hget(additionalKey, key));
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+ }
}
+
+ return redisVals;
} finally {
if (jedisCluster != null) {
redisClusterState.returnJedisCluster(jedisCluster);
}
}
-
- return ret;
- }
-
- @Override
- public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
- for (Values value : values) {
- collector.emit(value);
- }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 35fb48e..924b6b9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -19,63 +19,51 @@ package org.apache.storm.redis.trident.state;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
-import java.util.List;
-
-public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
- private final RedisStoreMapper storeMapper;
- private final int expireIntervalSec;
+import java.util.Map;
+public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- this.storeMapper = storeMapper;
- assertDataType(storeMapper.getDataTypeDescription());
-
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
+ super(storeMapper, expireIntervalSec);
}
@Override
- public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
- TridentCollector collector) {
-
+ protected void updateStatesToRedis(RedisClusterState redisClusterState, Map<String, String> keyToValue) {
JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();
- for (TridentTuple input : inputs) {
- String key = storeMapper.getKeyFromTuple(input);
- String value = storeMapper.getValueFromTuple(input);
- logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
+ for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+ String key = kvEntry.getKey();
+ String value = kvEntry.getValue();
- if (this.expireIntervalSec > 0) {
- jedisCluster.setex(key, expireIntervalSec, value);
- } else {
- jedisCluster.set(key, value);
+ switch (dataType) {
+ case STRING:
+ if (this.expireIntervalSec > 0) {
+ jedisCluster.setex(key, expireIntervalSec, value);
+ } else {
+ jedisCluster.set(key, value);
+ }
+ break;
+ case HASH:
+ jedisCluster.hset(additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
}
+
+ // send expire command for hash only once
+ // it expires key itself entirely, so use it with caution
+ if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+ this.expireIntervalSec > 0) {
+ jedisCluster.expire(additionalKey, expireIntervalSec);
+ }
} finally {
if (jedisCluster != null) {
redisClusterState.returnJedisCluster(jedisCluster);
}
}
}
-
- private void assertDataType(RedisDataTypeDescription storeMapper) {
- if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index a215741..ac102dd 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -17,62 +17,40 @@
*/
package org.apache.storm.redis.trident.state;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
-import java.util.ArrayList;
import java.util.List;
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
- private final RedisLookupMapper lookupMapper;
-
+public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> {
public RedisStateQuerier(RedisLookupMapper lookupMapper) {
- this.lookupMapper = lookupMapper;
- assertDataType(lookupMapper.getDataTypeDescription());
+ super(lookupMapper);
}
@Override
- public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
- List<List<Values>> values = new ArrayList<List<Values>>();
-
- List<String> keys = Lists.newArrayList();
- for (TridentTuple input : inputs) {
- keys.add(lookupMapper.getKeyFromTuple(input));
- }
-
+ protected List<String> retrieveValuesFromRedis(RedisState redisState, List<String> keys) {
Jedis jedis = null;
try {
jedis = redisState.getJedis();
- List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
-
- for (int i = 0 ; i < redisVals.size() ; i++) {
- values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+ List<String> redisVals;
+
+ String[] keysForRedis = keys.toArray(new String[keys.size()]);
+ switch (dataType) {
+ case STRING:
+ redisVals = jedis.mget(keysForRedis);
+ break;
+ case HASH:
+ redisVals = jedis.hmget(additionalKey, keysForRedis);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
- return values;
+ return redisVals;
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
}
}
}
-
- @Override
- public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
- for (Values value : values) {
- collector.emit(value);
- }
- }
-
- private void assertDataType(RedisDataTypeDescription lookupMapper) {
- if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 384a120..583fa32 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -19,51 +19,51 @@ package org.apache.storm.redis.trident.state;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import redis.clients.jedis.Pipeline;
-import java.util.List;
-
-public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
- private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
-
- private final RedisStoreMapper storeMapper;
- private final int expireIntervalSec;
+import java.util.Map;
+public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
- this.storeMapper = storeMapper;
- assertDataType(storeMapper.getDataTypeDescription());
-
- if (expireIntervalSec > 0) {
- this.expireIntervalSec = expireIntervalSec;
- } else {
- this.expireIntervalSec = 0;
- }
+ super(storeMapper, expireIntervalSec);
}
@Override
- public void updateState(RedisState redisState, List<TridentTuple> inputs,
- TridentCollector collector) {
+ protected void updateStatesToRedis(RedisState redisState, Map<String, String> keyToValue) {
Jedis jedis = null;
try {
jedis = redisState.getJedis();
- for (TridentTuple input : inputs) {
- String key = storeMapper.getKeyFromTuple(input);
- String value = storeMapper.getValueFromTuple(input);
+ Pipeline pipeline = jedis.pipelined();
- logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
+ for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+ String key = kvEntry.getKey();
+ String value = kvEntry.getValue();
- if (this.expireIntervalSec > 0) {
- jedis.setex(key, expireIntervalSec, value);
- } else {
- jedis.set(key, value);
+ switch (dataType) {
+ case STRING:
+ if (this.expireIntervalSec > 0) {
+ pipeline.setex(key, expireIntervalSec, value);
+ } else {
+ pipeline.set(key, value);
+ }
+ break;
+ case HASH:
+ pipeline.hset(additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + dataType);
}
}
+
+ // send expire command for hash only once
+ // it expires key itself entirely, so use it with caution
+ if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+ this.expireIntervalSec > 0) {
+ pipeline.expire(additionalKey, expireIntervalSec);
+ }
+
+ pipeline.sync();
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
@@ -71,9 +71,4 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
}
}
- private void assertDataType(RedisDataTypeDescription storeMapper) {
- if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
- throw new IllegalArgumentException("State should be STRING type");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
index 891a1af..5c67c8c 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -25,7 +25,7 @@ public class WordCountLookupMapper implements RedisLookupMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
- return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
index aa03ead..6521302 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -7,7 +7,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper;
public class WordCountStoreMapper implements RedisStoreMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
- return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+ return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
}
@Override