You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/02 22:07:57 UTC

[02/16] storm git commit: Redis*StateQuerier / Redis*StateUpdater now support HASH type

Redis*StateQuerier / Redis*StateUpdater now support HASH type

* use Pipeline when available to gain performance
* extract abstract classes to reduce code duplication
 ** AbstractRedisStateQuerier, AbstractRedisStateUpdater


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8fd1f4b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8fd1f4b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8fd1f4b1

Branch: refs/heads/0.10.x-branch
Commit: 8fd1f4b193b5d81b3036726268da39380e2b3b61
Parents: 7fec9a1
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 12:21:57 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 12:21:57 2015 +0900

----------------------------------------------------------------------
 .../state/AbstractRedisStateQuerier.java        | 69 ++++++++++++++++++++
 .../state/AbstractRedisStateUpdater.java        | 67 +++++++++++++++++++
 .../trident/state/RedisClusterStateQuerier.java | 53 ++++++---------
 .../trident/state/RedisClusterStateUpdater.java | 66 ++++++++-----------
 .../redis/trident/state/RedisStateQuerier.java  | 54 +++++----------
 .../redis/trident/state/RedisStateUpdater.java  | 67 +++++++++----------
 .../redis/trident/WordCountLookupMapper.java    |  2 +-
 .../redis/trident/WordCountStoreMapper.java     |  2 +-
 8 files changed, 231 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
new file mode 100644
index 0000000..24ecfc4
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public abstract class AbstractRedisStateQuerier<T extends State> extends BaseQueryFunction<T, List<Values>> {
+	private final RedisLookupMapper lookupMapper;
+	protected final RedisDataTypeDescription.RedisDataType dataType;
+	protected final String additionalKey;
+
+	public AbstractRedisStateQuerier(RedisLookupMapper lookupMapper) {
+		this.lookupMapper = lookupMapper;
+
+		RedisDataTypeDescription dataTypeDescription = lookupMapper.getDataTypeDescription();
+		this.dataType = dataTypeDescription.getDataType();
+		this.additionalKey = dataTypeDescription.getAdditionalKey();
+	}
+
+	@Override
+	public List<List<Values>> batchRetrieve(T state, List<TridentTuple> inputs) {
+		List<List<Values>> values = Lists.newArrayList();
+
+		List<String> keys = Lists.newArrayList();
+		for (TridentTuple input : inputs) {
+			keys.add(lookupMapper.getKeyFromTuple(input));
+		}
+
+		List<String> redisVals = retrieveValuesFromRedis(state, keys);
+		for (int i = 0 ; i < redisVals.size() ; i++) {
+			values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+		}
+
+		return values;
+	}
+
+	@Override
+	public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
+		for (Values value : values) {
+			collector.emit(value);
+		}
+	}
+
+	protected abstract List<String> retrieveValuesFromRedis(T redisClusterState, List<String> keys);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
new file mode 100644
index 0000000..2f95341
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractRedisStateUpdater<T extends State> extends BaseStateUpdater<T> {
+	private final RedisStoreMapper storeMapper;
+
+	protected final int expireIntervalSec;
+	protected final RedisDataTypeDescription.RedisDataType dataType;
+	protected final String additionalKey;
+
+	public AbstractRedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
+		this.storeMapper = storeMapper;
+		RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
+		this.dataType = dataTypeDescription.getDataType();
+		this.additionalKey = dataTypeDescription.getAdditionalKey();
+
+		if (expireIntervalSec > 0) {
+			this.expireIntervalSec = expireIntervalSec;
+		} else {
+			this.expireIntervalSec = 0;
+		}
+	}
+
+	@Override
+	public void updateState(T state, List<TridentTuple> inputs,
+			TridentCollector collector) {
+		Map<String, String> keyToValue = new HashMap<String, String>();
+
+		for (TridentTuple input : inputs) {
+			String key = storeMapper.getKeyFromTuple(input);
+			String value = storeMapper.getValueFromTuple(input);
+
+			keyToValue.put(key, value);
+		}
+
+		updateStatesToRedis(state, keyToValue);
+	}
+
+	protected abstract void updateStatesToRedis(T state, Map<String, String> keyToValue);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
index 4382fe3..66ff3f6 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
@@ -17,57 +17,42 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 
-public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
-    private final RedisLookupMapper lookupMapper;
-
+public class RedisClusterStateQuerier extends AbstractRedisStateQuerier<RedisClusterState> {
     public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
-        this.lookupMapper = lookupMapper;
+        super(lookupMapper);
     }
 
     @Override
-    public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
-        List<List<Values>> ret = Lists.newArrayList();
-
+    protected List<String> retrieveValuesFromRedis(RedisClusterState redisClusterState, List<String> keys) {
         JedisCluster jedisCluster = null;
         try {
             jedisCluster = redisClusterState.getJedisCluster();
-
-            for (int i = 0 ; i < inputs.size() ; i++) {
-                TridentTuple input = inputs.get(i);
-
-                String key = lookupMapper.getKeyFromTuple(input);
-                String value = jedisCluster.get(key);
-                ret.add(lookupMapper.toTuple(input, value));
-                logger.debug("redis get key[" + key + "] value [" + value + "]");
+            List<String> redisVals = new ArrayList<String>();
+
+            for (String key : keys) {
+                switch (dataType) {
+                case STRING:
+                    redisVals.add(jedisCluster.get(key));
+                    break;
+                case HASH:
+                    redisVals.add(jedisCluster.hget(additionalKey, key));
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
+                }
             }
+
+            return redisVals;
         } finally {
             if (jedisCluster != null) {
                 redisClusterState.returnJedisCluster(jedisCluster);
             }
         }
-
-        return ret;
-    }
-
-    @Override
-    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
-        for (Values value : values) {
-            collector.emit(value);
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
index 35fb48e..924b6b9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
@@ -19,63 +19,51 @@ package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
-public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);
-
-    private final RedisStoreMapper storeMapper;
-    private final int expireIntervalSec;
+import java.util.Map;
 
+public class RedisClusterStateUpdater extends AbstractRedisStateUpdater<RedisClusterState> {
     public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-        this.storeMapper = storeMapper;
-        assertDataType(storeMapper.getDataTypeDescription());
-
-        if (expireIntervalSec > 0) {
-            this.expireIntervalSec = expireIntervalSec;
-        } else {
-            this.expireIntervalSec = 0;
-        }
+        super(storeMapper, expireIntervalSec);
     }
 
     @Override
-    public void updateState(RedisClusterState redisClusterState, List<TridentTuple> inputs,
-                            TridentCollector collector) {
-
+    protected void updateStatesToRedis(RedisClusterState redisClusterState, Map<String, String> keyToValue) {
         JedisCluster jedisCluster = null;
         try {
             jedisCluster = redisClusterState.getJedisCluster();
-            for (TridentTuple input : inputs) {
-                String key = storeMapper.getKeyFromTuple(input);
-                String value = storeMapper.getValueFromTuple(input);
 
-                logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");
+            for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+                String key = kvEntry.getKey();
+                String value = kvEntry.getValue();
 
-                if (this.expireIntervalSec > 0) {
-                    jedisCluster.setex(key, expireIntervalSec, value);
-                } else {
-                    jedisCluster.set(key, value);
+                switch (dataType) {
+                case STRING:
+                    if (this.expireIntervalSec > 0) {
+                        jedisCluster.setex(key, expireIntervalSec, value);
+                    } else {
+                        jedisCluster.set(key, value);
+                    }
+                    break;
+                case HASH:
+                    jedisCluster.hset(additionalKey, key, value);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
                 }
             }
+
+            // send expire command for hash only once
+            // it expires key itself entirely, so use it with caution
+            if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+                    this.expireIntervalSec > 0) {
+                jedisCluster.expire(additionalKey, expireIntervalSec);
+            }
         } finally {
             if (jedisCluster != null) {
                 redisClusterState.returnJedisCluster(jedisCluster);
             }
         }
     }
-
-    private void assertDataType(RedisDataTypeDescription storeMapper) {
-        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
index a215741..ac102dd 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java
@@ -17,62 +17,40 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.tuple.TridentTuple;
 
-import java.util.ArrayList;
 import java.util.List;
 
-public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
-    private final RedisLookupMapper lookupMapper;
-
+public class RedisStateQuerier extends AbstractRedisStateQuerier<RedisState> {
     public RedisStateQuerier(RedisLookupMapper lookupMapper) {
-        this.lookupMapper = lookupMapper;
-        assertDataType(lookupMapper.getDataTypeDescription());
+        super(lookupMapper);
     }
 
     @Override
-    public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
-        List<List<Values>> values = new ArrayList<List<Values>>();
-
-        List<String> keys = Lists.newArrayList();
-        for (TridentTuple input : inputs) {
-            keys.add(lookupMapper.getKeyFromTuple(input));
-        }
-
+    protected List<String> retrieveValuesFromRedis(RedisState redisState, List<String> keys) {
         Jedis jedis = null;
         try {
             jedis = redisState.getJedis();
-            List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));
-
-            for (int i = 0 ; i < redisVals.size() ; i++) {
-                values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
+            List<String> redisVals;
+
+            String[] keysForRedis = keys.toArray(new String[keys.size()]);
+            switch (dataType) {
+            case STRING:
+                redisVals = jedis.mget(keysForRedis);
+                break;
+            case HASH:
+                redisVals = jedis.hmget(additionalKey, keysForRedis);
+                break;
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + dataType);
             }
 
-            return values;
+            return redisVals;
         } finally {
             if (jedis != null) {
                 redisState.returnJedis(jedis);
             }
         }
     }
-
-    @Override
-    public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
-        for (Values value : values) {
-            collector.emit(value);
-        }
-    }
-
-    private void assertDataType(RedisDataTypeDescription lookupMapper) {
-        if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
index 384a120..583fa32 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java
@@ -19,51 +19,51 @@ package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.apache.storm.redis.common.mapper.TupleMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import redis.clients.jedis.Pipeline;
 
-import java.util.List;
-
-public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
-
-    private final RedisStoreMapper storeMapper;
-    private final int expireIntervalSec;
+import java.util.Map;
 
+public class RedisStateUpdater extends AbstractRedisStateUpdater<RedisState> {
     public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
-        this.storeMapper = storeMapper;
-        assertDataType(storeMapper.getDataTypeDescription());
-
-        if (expireIntervalSec > 0) {
-            this.expireIntervalSec = expireIntervalSec;
-        } else {
-            this.expireIntervalSec = 0;
-        }
+        super(storeMapper, expireIntervalSec);
     }
 
     @Override
-    public void updateState(RedisState redisState, List<TridentTuple> inputs,
-                            TridentCollector collector) {
+    protected void updateStatesToRedis(RedisState redisState, Map<String, String> keyToValue) {
         Jedis jedis = null;
         try {
             jedis = redisState.getJedis();
-            for (TridentTuple input : inputs) {
-                String key = storeMapper.getKeyFromTuple(input);
-                String value = storeMapper.getValueFromTuple(input);
+            Pipeline pipeline = jedis.pipelined();
 
-                logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");
+            for (Map.Entry<String, String> kvEntry : keyToValue.entrySet()) {
+                String key = kvEntry.getKey();
+                String value = kvEntry.getValue();
 
-                if (this.expireIntervalSec > 0) {
-                    jedis.setex(key, expireIntervalSec, value);
-                } else {
-                    jedis.set(key, value);
+                switch (dataType) {
+                case STRING:
+                    if (this.expireIntervalSec > 0) {
+                        pipeline.setex(key, expireIntervalSec, value);
+                    } else {
+                        pipeline.set(key, value);
+                    }
+                    break;
+                case HASH:
+                    pipeline.hset(additionalKey, key, value);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
                 }
             }
+
+            // send expire command for hash only once
+            // it expires key itself entirely, so use it with caution
+            if (dataType == RedisDataTypeDescription.RedisDataType.HASH &&
+                    this.expireIntervalSec > 0) {
+                pipeline.expire(additionalKey, expireIntervalSec);
+            }
+
+            pipeline.sync();
         } finally {
             if (jedis != null) {
                 redisState.returnJedis(jedis);
@@ -71,9 +71,4 @@ public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
         }
     }
 
-    private void assertDataType(RedisDataTypeDescription storeMapper) {
-        if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
-            throw new IllegalArgumentException("State should be STRING type");
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
index 891a1af..5c67c8c 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -25,7 +25,7 @@ public class WordCountLookupMapper implements RedisLookupMapper {
 
     @Override
     public RedisDataTypeDescription getDataTypeDescription() {
-        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8fd1f4b1/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
index aa03ead..6521302 100644
--- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -7,7 +7,7 @@ import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 public class WordCountStoreMapper implements RedisStoreMapper {
     @Override
     public RedisDataTypeDescription getDataTypeDescription() {
-        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
+        return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "test");
     }
 
     @Override