You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/06/02 17:00:31 UTC

[03/12] storm git commit: Extract abstract classes to reduce code duplication (Redis*MapState)

Extract abstract classes to reduce code duplication (Redis*MapState)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9c0329b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9c0329b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9c0329b

Branch: refs/heads/master
Commit: b9c0329bd182333f0af237e661e88958d7157520
Parents: 8fd1f4b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Apr 5 18:55:03 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Apr 5 18:55:03 2015 +0900

----------------------------------------------------------------------
 .../trident/state/AbstractRedisMapState.java    |  70 ++++++++
 .../storm/redis/trident/state/KeyFactory.java   |  18 ++
 .../storm/redis/trident/state/Options.java      |  13 ++
 .../trident/state/RedisClusterMapState.java     | 137 +++-----------
 .../redis/trident/state/RedisMapState.java      | 177 +++++--------------
 5 files changed, 173 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
new file mode 100644
index 0000000..a85a57a
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -0,0 +1,70 @@
+package org.apache.storm.redis.trident.state;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import storm.trident.state.*;
+import storm.trident.state.map.IBackingMap;
+
+import java.util.*;
+
+public abstract class AbstractRedisMapState<T> implements IBackingMap<T> {
+	public static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
+			StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
+			StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
+			StateType.OPAQUE, new JSONOpaqueSerializer()
+	));
+
+	@Override public List<T> multiGet(List<List<Object>> keys) {
+		if (keys.size() == 0) {
+			return Collections.emptyList();
+		}
+
+		List<String> stringKeys = buildKeys(keys);
+		List<String> values = retrieveValuesFromRedis(stringKeys);
+
+		return deserializeValues(keys, values);
+	}
+
+	@Override
+	public void multiPut(List<List<Object>> keys, List<T> vals) {
+		if (keys.size() == 0) {
+			return;
+		}
+
+		Map<String, String> keyValues = new HashMap<String, String>();
+		for (int i = 0; i < keys.size(); i++) {
+			String val = new String(getSerializer().serialize(vals.get(i)));
+			String redisKey = getKeyFactory().build(keys.get(i));
+			keyValues.put(redisKey, val);
+		}
+
+		updateStatesToRedis(keyValues);
+	}
+
+	private List<String> buildKeys(List<List<Object>> keys) {
+		List<String> stringKeys = new ArrayList<String>();
+
+		for (List<Object> key : keys) {
+			stringKeys.add(getKeyFactory().build(key));
+		}
+
+		return stringKeys;
+	}
+
+	private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
+		List<T> result = new ArrayList<T>(keys.size());
+		for (String value : values) {
+			if (value != null) {
+				result.add((T) getSerializer().deserialize(value.getBytes()));
+			} else {
+				result.add(null);
+			}
+		}
+		return result;
+	}
+
+	protected abstract Serializer getSerializer();
+	protected abstract KeyFactory getKeyFactory();
+	protected abstract List<String> retrieveValuesFromRedis(List<String> keys);
+	protected abstract void updateStatesToRedis(Map<String, String> keyValues);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
new file mode 100644
index 0000000..0fac726
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/KeyFactory.java
@@ -0,0 +1,18 @@
+package org.apache.storm.redis.trident.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KeyFactory extends Serializable {
+	String build(List<Object> key);
+
+	class DefaultKeyFactory implements KeyFactory {
+		public String build(List<Object> key) {
+			if (key.size() != 1)
+				throw new RuntimeException("Default KeyFactory does not support compound keys");
+
+			return (String) key.get(0);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
new file mode 100644
index 0000000..59e4ecb
--- /dev/null
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -0,0 +1,13 @@
+package org.apache.storm.redis.trident.state;
+
+import storm.trident.state.Serializer;
+
+import java.io.Serializable;
+
+public class Options<T> implements Serializable {
+	public int localCacheSize = 1000;
+	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
+	KeyFactory keyFactory = null;
+	public Serializer<T> serializer = null;
+	public String hkey = null;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 1154376..ccf5b38 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
 import java.util.List;
 import java.util.Map;
 
-public class RedisClusterMapState<T> implements IBackingMap<T> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisClusterMapState.class);
-
-    private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
-            StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
-            StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
-            StateType.OPAQUE, new JSONOpaqueSerializer()
-    ));
-
-    public static class DefaultKeyFactory implements KeyFactory {
-        public String build(List<Object> key) {
-            if (key.size() != 1)
-                throw new RuntimeException("Default KeyFactory does not support compound keys");
-            return (String) key.get(0);
-        }
-    };
-
-    public static class Options<T> implements Serializable {
-        public int localCacheSize = 1000;
-        public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
-        public Serializer<T> serializer = null;
-        public String hkey = null;
-    }
-
-    public static interface KeyFactory extends Serializable {
-        String build(List<Object> key);
-    }
-
+public class RedisClusterMapState<T> extends AbstractRedisMapState<T> {
     /**
      * OpaqueTransactional for redis-cluster.
      * */
@@ -150,8 +99,6 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
         return new Factory(jedisClusterConfig, StateType.NON_TRANSACTIONAL, opts);
     }
 
-
-
     protected static class Factory implements StateFactory {
         public static final redis.clients.jedis.JedisPoolConfig DEFAULT_POOL_CONFIG = new redis.clients.jedis.JedisPoolConfig();
 
@@ -169,7 +116,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
 
             this.keyFactory = options.keyFactory;
             if (this.keyFactory == null) {
-                this.keyFactory = new DefaultKeyFactory();
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
             }
             this.serializer = options.serializer;
             if (this.serializer == null) {
@@ -220,74 +167,40 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
         this.keyFactory = keyFactory;
     }
 
-    public List<T> multiGet(List<List<Object>> keys) {
-        if (keys.size() == 0) {
-            return Collections.emptyList();
-        }
+    @Override
+    protected Serializer getSerializer() {
+        return serializer;
+    }
+
+    @Override
+    protected KeyFactory getKeyFactory() {
+        return keyFactory;
+    }
+
+    @Override
+    protected List<String> retrieveValuesFromRedis(List<String> keys) {
+        String[] stringKeys = keys.toArray(new String[keys.size()]);
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            String[] stringKeys = buildKeys(keys);
             List<String> values = Lists.newArrayList();
 
-            for (String stringKey : stringKeys) {
+            for (String stringKey : keys) {
                 String value = jedisCluster.get(stringKey);
                 values.add(value);
             }
 
-            return deserializeValues(keys, values);
+            return values;
         } else {
-            Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
-            List<String> values = buildValuesFromMap(keys, keyValue);
-            return deserializeValues(keys, values);
-        }
-    }
-
-    private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
-        List<String> values = new ArrayList<String>(keys.size());
-        for (List<Object> key : keys) {
-            String strKey = keyFactory.build(key);
-            String value = keyValue.get(strKey);
-            values.add(value);
-        }
-        return values;
-    }
-
-    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
-        List<T> result = new ArrayList<T>(keys.size());
-        for (String value : values) {
-            if (value != null) {
-                result.add((T) serializer.deserialize(value.getBytes()));
-            } else {
-                result.add(null);
-            }
+            return jedisCluster.hmget(this.options.hkey, stringKeys);
         }
-        return result;
-    }
-
-    private String[] buildKeys(List<List<Object>> keys) {
-        String[] stringKeys = new String[keys.size()];
-        int index = 0;
-        for (List<Object> key : keys)
-            stringKeys[index++] = keyFactory.build(key);
-        return stringKeys;
     }
 
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        if (keys.size() == 0) {
-            return;
-        }
-
+    @Override
+    protected void updateStatesToRedis(Map<String, String> keyValues) {
         if (Strings.isNullOrEmpty(this.options.hkey)) {
-            for (int i = 0; i < keys.size(); i++) {
-                String val = new String(serializer.serialize(vals.get(i)));
-                String redisKey = keyFactory.build(keys.get(i));
-                jedisCluster.set(redisKey, val);
+            for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+                jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
             }
         } else {
-            Map<String, String> keyValues = new HashMap<String, String>();
-            for (int i = 0; i < keys.size(); i++) {
-                String val = new String(serializer.serialize(vals.get(i)));
-                keyValues.put(keyFactory.build(keys.get(i)), val);
-            }
             jedisCluster.hmset(this.options.hkey, keyValues);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b9c0329b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7bc5afb..1461203 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -20,67 +20,16 @@ package org.apache.storm.redis.trident.state;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.tuple.Values;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.IBackingMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
+import storm.trident.state.*;
+import storm.trident.state.map.*;
+
 import java.util.List;
 import java.util.Map;
 
-public class RedisMapState<T> implements IBackingMap<T> {
-    private static final Logger logger = LoggerFactory.getLogger(RedisMapState.class);
-
-    private static final EnumMap<StateType, Serializer> DEFAULT_SERIALIZERS = Maps.newEnumMap(ImmutableMap.of(
-            StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer(),
-            StateType.TRANSACTIONAL, new JSONTransactionalSerializer(),
-            StateType.OPAQUE, new JSONOpaqueSerializer()
-    ));
-
-    public static class DefaultKeyFactory implements KeyFactory {
-        public String build(List<Object> key) {
-            if (key.size() != 1)
-                throw new RuntimeException("Default KeyFactory does not support compound keys");
-            return (String) key.get(0);
-        }
-    };
-
-    public static class Options<T> implements Serializable {
-        public int localCacheSize = 1000;
-        public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
-        public Serializer<T> serializer = null;
-        public String hkey = null;
-    }
-
-    public static interface KeyFactory extends Serializable {
-        String build(List<Object> key);
-    }
-
+public class RedisMapState<T> extends AbstractRedisMapState<T> {
     /**
      * OpaqueTransactional for redis.
      * */
@@ -167,7 +116,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
 
             this.keyFactory = options.keyFactory;
             if (this.keyFactory == null) {
-                this.keyFactory = new DefaultKeyFactory();
+                this.keyFactory = new KeyFactory.DefaultKeyFactory();
             }
             this.serializer = options.serializer;
             if (this.serializer == null) {
@@ -219,96 +168,64 @@ public class RedisMapState<T> implements IBackingMap<T> {
         this.keyFactory = keyFactory;
     }
 
-    public List<T> multiGet(List<List<Object>> keys) {
-        if (keys.size() == 0) {
-            return Collections.emptyList();
-        }
+    @Override
+    protected Serializer getSerializer() {
+        return serializer;
+    }
 
-        String[] stringKeys = buildKeys(keys);
-
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                List<String> values = jedis.mget(stringKeys);
-                return deserializeValues(keys, values);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        } else {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                List<String> values = jedis.hmget(this.options.hkey, stringKeys);
-                return deserializeValues(keys, values);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        }
+    @Override
+    protected KeyFactory getKeyFactory() {
+        return keyFactory;
     }
 
-    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
-        List<T> result = new ArrayList<T>(keys.size());
-        for (String value : values) {
-            if (value != null) {
-                result.add((T) serializer.deserialize(value.getBytes()));
+    @Override
+    protected List<String> retrieveValuesFromRedis(List<String> keys) {
+        String[] stringKeys = keys.toArray(new String[keys.size()]);
+        Jedis jedis = null;
+        try {
+            jedis = jedisPool.getResource();
+
+            if (Strings.isNullOrEmpty(this.options.hkey)) {
+                return jedis.mget(stringKeys);
             } else {
-                result.add(null);
+                return jedis.hmget(this.options.hkey, stringKeys);
+            }
+        } finally {
+            if (jedis != null) {
+                jedis.close();
             }
         }
-        return result;
     }
 
-    private String[] buildKeys(List<List<Object>> keys) {
-        String[] stringKeys = new String[keys.size()];
-        int index = 0;
-        for (List<Object> key : keys)
-            stringKeys[index++] = keyFactory.build(key);
-        return stringKeys;
-    }
+    @Override
+    protected void updateStatesToRedis(Map<String, String> keyValues) {
+        Jedis jedis = null;
 
-    public void multiPut(List<List<Object>> keys, List<T> vals) {
-        if (keys.size() == 0) {
-            return;
-        }
+        try {
+            jedis = jedisPool.getResource();
 
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
-                String[] keyValue = buildKeyValuesList(keys, vals);
+            if (Strings.isNullOrEmpty(this.options.hkey)) {
+                String[] keyValue = buildKeyValuesList(keyValues);
                 jedis.mset(keyValue);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
-                }
-            }
-        } else {
-            Jedis jedis = jedisPool.getResource();
-            try {
-                Map<String, String> keyValues = new HashMap<String, String>();
-                for (int i = 0; i < keys.size(); i++) {
-                    String val = new String(serializer.serialize(vals.get(i)));
-                    keyValues.put(keyFactory.build(keys.get(i)), val);
-                }
+            } else {
                 jedis.hmset(this.options.hkey, keyValues);
-
-            } finally {
-                jedisPool.returnResource(jedis);
+            }
+        } finally {
+            if (jedis != null) {
+                jedis.close();
             }
         }
     }
 
-    private String[] buildKeyValuesList(List<List<Object>> keys, List<T> vals) {
-        String[] keyValues = new String[keys.size() * 2];
-        for (int i = 0; i < keys.size(); i++) {
-            keyValues[i * 2] = keyFactory.build(keys.get(i));
-            keyValues[i * 2 + 1] = new String(serializer.serialize(vals.get(i)));
+    private String[] buildKeyValuesList(Map<String, String> keyValues) {
+        String[] keyValueLists = new String[keyValues.size() * 2];
+
+        int idx = 0;
+        for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
+            keyValueLists[idx++] = kvEntry.getKey();
+            keyValueLists[idx++] = kvEntry.getValue();
         }
-        return keyValues;
+
+        return keyValueLists;
     }
 }