You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 05:42:54 UTC

[1/9] storm git commit: Added expireIntervalSec to RedisMapState.Options

Repository: storm
Updated Branches:
  refs/heads/master a0c032358 -> a55bbbea8


Added expireIntervalSec to RedisMapState.Options

This matches the option available to users of RedisStateUpdater.
Bit heavy when setting top-level keys, since Redis doesn't have
MSETEX to match its SETEX command where we could include an expiry
timeout.  Pipelined to mitigate the overhead of needing to issue a
separate command per key.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/410feef5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/410feef5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/410feef5

Branch: refs/heads/master
Commit: 410feef51cf7e85267741ca08cdd57a509d1e252
Parents: c36702e
Author: eric-mulvaney <er...@kontagent.com>
Authored: Tue Apr 7 17:25:03 2015 -0400
Committer: eric-mulvaney <er...@kontagent.com>
Committed: Tue Apr 7 17:38:51 2015 -0400

----------------------------------------------------------------------
 .../redis/trident/state/RedisMapState.java      | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/410feef5/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 0379826..84cd1f9 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.Pipeline;
 import storm.trident.state.JSONNonTransactionalSerializer;
 import storm.trident.state.JSONOpaqueSerializer;
 import storm.trident.state.JSONTransactionalSerializer;
@@ -75,6 +76,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
         public KeyFactory keyFactory = null;
         public Serializer<T> serializer = null;
         public String hkey = null;
+        public int expireIntervalSec = 0;
     }
 
     public static interface KeyFactory extends Serializable {
@@ -276,30 +278,31 @@ public class RedisMapState<T> implements IBackingMap<T> {
             return;
         }
 
-        if (Strings.isNullOrEmpty(this.options.hkey)) {
-            Jedis jedis = null;
-            try {
-                jedis = jedisPool.getResource();
+        Jedis jedis = jedisPool.getResource();
+        try {
+            if (Strings.isNullOrEmpty(this.options.hkey)) {
                 String[] keyValue = buildKeyValuesList(keys, vals);
                 jedis.mset(keyValue);
-            } finally {
-                if (jedis != null) {
-                    jedisPool.returnResource(jedis);
+                if (this.options.expireIntervalSec > 0) {
+                    Pipeline pipe = jedis.pipelined();
+                    for(int i = 0; i < keyValue.length; i += 2) {
+                        pipe.expire(keyValue[i], this.options.expireIntervalSec);
+                    }
+                    pipe.sync();
                 }
-            }
-        } else {
-            Jedis jedis = jedisPool.getResource();
-            try {
+            } else {
                 Map<String, String> keyValues = new HashMap<String, String>();
                 for (int i = 0; i < keys.size(); i++) {
                     String val = new String(serializer.serialize(vals.get(i)));
                     keyValues.put(keyFactory.build(keys.get(i)), val);
                 }
                 jedis.hmset(this.options.hkey, keyValues);
-
-            } finally {
-                jedisPool.returnResource(jedis);
+                if (this.options.expireIntervalSec > 0) {
+                    jedis.expire(this.options.hkey, this.options.expireIntervalSec);
+                }
             }
+        } finally {
+            jedisPool.returnResource(jedis);
         }
     }
 


[2/9] storm git commit: Made the keyFactory option public.

Posted by pt...@apache.org.
Made the keyFactory option public.

It can be set from the public API in other ways.  Having it public
means we can set it along with other values like hkey.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c36702e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c36702e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c36702e2

Branch: refs/heads/master
Commit: c36702e210080b2bc6d59d8f9c0b7307f5926869
Parents: a6935d4
Author: eric-mulvaney <er...@kontagent.com>
Authored: Tue Apr 7 17:24:20 2015 -0400
Committer: eric-mulvaney <er...@kontagent.com>
Committed: Tue Apr 7 17:38:51 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/storm/redis/trident/state/RedisMapState.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c36702e2/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 7bc5afb..0379826 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -72,7 +72,7 @@ public class RedisMapState<T> implements IBackingMap<T> {
     public static class Options<T> implements Serializable {
         public int localCacheSize = 1000;
         public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
+        public KeyFactory keyFactory = null;
         public Serializer<T> serializer = null;
         public String hkey = null;
     }


[7/9] storm git commit: add STORM-761 to changelog

Posted by pt...@apache.org.
add STORM-761 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e7818af3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e7818af3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e7818af3

Branch: refs/heads/master
Commit: e7818af37ba2b62b222a4ddff948c0f3aff0c7ea
Parents: 11768ba
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Jun 2 16:04:31 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Jun 2 16:04:31 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e7818af3/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ad235d8..f9f7999 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,6 @@
 ## 0.11.0
 
 ## 0.10.0
- * STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value
  * STORM-835: Netty Client hold batch object until io operation complete
  * STORM-827: Allow AutoTGT to work with storm-hdfs too.
  * STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.
@@ -26,7 +25,9 @@
  * STORM-765: Thrift serialization for local state
  * STORM-764: Have option to compress thrift heartbeat
  * STORM-762: uptime for worker heartbeats is lost when converted to thrift
+ * STORM-761: An option for new/updated Redis keys to expire in RedisMapState
  * STORM-757: Simulated time can leak out on errors
+ * STORM-753: Improve RedisStateQuerier to convert List<Values> from Redis value
  * STORM-752: [storm-redis] Clarify Redis*StateUpdater's expire is optional
  * STORM-750: Set Config serialVersionUID
  * STORM-749: Remove CSRF check from the REST API.


[4/9] storm git commit: Added expireIntervalSec to RedisClusterMapState.Options

Posted by pt...@apache.org.
Added expireIntervalSec to RedisClusterMapState.Options

Just as I added this to RedisMapState.Options to match the setting
available in the Redis[Cluster]StateUpdater classes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/306c535f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/306c535f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/306c535f

Branch: refs/heads/master
Commit: 306c535fb3e7fb5522e9cd2e0f41ba4c9fa8f9ca
Parents: 2059f23
Author: eric-mulvaney <er...@kontagent.com>
Authored: Wed Apr 8 14:37:06 2015 -0400
Committer: eric-mulvaney <er...@kontagent.com>
Committed: Wed Apr 8 14:37:06 2015 -0400

----------------------------------------------------------------------
 .../storm/redis/trident/state/RedisClusterMapState.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/306c535f/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index e47330b..7bcbb6c 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -75,6 +75,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
         public KeyFactory keyFactory = null;
         public Serializer<T> serializer = null;
         public String hkey = null;
+        public int expireIntervalSec = 0;
     }
 
     public static interface KeyFactory extends Serializable {
@@ -276,11 +277,16 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
             return;
         }
 
+        final int expireIntervalSec = this.options.expireIntervalSec;
         if (Strings.isNullOrEmpty(this.options.hkey)) {
             for (int i = 0; i < keys.size(); i++) {
                 String val = new String(serializer.serialize(vals.get(i)));
                 String redisKey = keyFactory.build(keys.get(i));
-                jedisCluster.set(redisKey, val);
+                if (expireIntervalSec > 0) {
+                    jedisCluster.setex(redisKey, expireIntervalSec, val);
+                } else {
+                    jedisCluster.set(redisKey, val);
+                }
             }
         } else {
             Map<String, String> keyValues = new HashMap<String, String>();
@@ -289,6 +295,9 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
                 keyValues.put(keyFactory.build(keys.get(i)), val);
             }
             jedisCluster.hmset(this.options.hkey, keyValues);
+            if (expireIntervalSec > 0) {
+                jedisCluster.expire(this.options.hkey, expireIntervalSec);
+            }
         }
     }
 }


[9/9] storm git commit: update changelog for STORM-842

Posted by pt...@apache.org.
update changelog for STORM-842


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a55bbbea
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a55bbbea
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a55bbbea

Branch: refs/heads/master
Commit: a55bbbea85fa83c3e9ab664922580d464de565fe
Parents: d285d94
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Jun 2 17:16:53 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Jun 2 17:16:53 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a55bbbea/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f9f7999..6329872 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
 ## 0.11.0
 
 ## 0.10.0
+ * STORM-842: Drop Support for Java 1.6
  * STORM-835: Netty Client hold batch object until io operation complete
  * STORM-827: Allow AutoTGT to work with storm-hdfs too.
  * STORM-821: Adding connection provider interface to decouple jdbc connector from a single connection pooling implementation.


[8/9] storm git commit: Merge branch 'java17'

Posted by pt...@apache.org.
Merge branch 'java17'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d285d94d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d285d94d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d285d94d

Branch: refs/heads/master
Commit: d285d94d418bede3376e743f882c157a83f68a7a
Parents: e7818af fc73600
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Jun 2 17:15:30 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Jun 2 17:15:30 2015 -0400

----------------------------------------------------------------------
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[3/9] storm git commit: Made keyFactory public in RedisClusterMapState.Options

Posted by pt...@apache.org.
Made keyFactory public in RedisClusterMapState.Options

To match the same change to RedisMapState.Options, for the same reasons.
It can be set from the public API in other ways.  Having it public means
we can set it along with other values like hkey.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2059f23d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2059f23d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2059f23d

Branch: refs/heads/master
Commit: 2059f23d8a30df9089f6a928b4a2bf51d608099f
Parents: 410feef
Author: eric-mulvaney <er...@kontagent.com>
Authored: Wed Apr 8 14:35:51 2015 -0400
Committer: eric-mulvaney <er...@kontagent.com>
Committed: Wed Apr 8 14:35:51 2015 -0400

----------------------------------------------------------------------
 .../org/apache/storm/redis/trident/state/RedisClusterMapState.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2059f23d/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 1154376..e47330b 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -72,7 +72,7 @@ public class RedisClusterMapState<T> implements IBackingMap<T> {
     public static class Options<T> implements Serializable {
         public int localCacheSize = 1000;
         public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
-        KeyFactory keyFactory = null;
+        public KeyFactory keyFactory = null;
         public Serializer<T> serializer = null;
         public String hkey = null;
     }


[6/9] storm git commit: Merge branch 'feature/expire-in-redis-map-state' of github.com:emulvaney/storm

Posted by pt...@apache.org.
Merge branch 'feature/expire-in-redis-map-state' of github.com:emulvaney/storm

Conflicts:
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
	external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11768ba2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11768ba2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11768ba2

Branch: refs/heads/master
Commit: 11768ba2025c89ec6cccc232f682c50a46865dc8
Parents: a0c0323 306c535
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Jun 2 16:02:20 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Jun 2 16:02:20 2015 -0400

----------------------------------------------------------------------
 .../org/apache/storm/redis/trident/state/Options.java    |  1 +
 .../storm/redis/trident/state/RedisClusterMapState.java  |  9 ++++++++-
 .../apache/storm/redis/trident/state/RedisMapState.java  | 11 +++++++++++
 3 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11768ba2/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index dffb713,0000000..e1bf5fb
mode 100644,000000..100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@@ -1,33 -1,0 +1,34 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.redis.trident.state;
 +
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 +import storm.trident.state.Serializer;
 +
 +import java.io.Serializable;
 +
 +public class Options<T> implements Serializable {
 +	private static final RedisDataTypeDescription DEFAULT_REDIS_DATATYPE = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
 +
 +	public int localCacheSize = 1000;
 +	public String globalKey = "$REDIS-MAP-STATE-GLOBAL";
 +	public KeyFactory keyFactory = null;
 +	public Serializer<T> serializer = null;
 +	public RedisDataTypeDescription dataTypeDescription = DEFAULT_REDIS_DATATYPE;
++	public int expireIntervalSec = 0;
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/11768ba2/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index 230f7f0,7bcbb6c..cd871eb
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@@ -200,32 -234,70 +200,39 @@@ public class RedisClusterMapState<T> ex
                  values.add(value);
              }
  
 -            return deserializeValues(keys, values);
 -        } else {
 -            Map<String, String> keyValue = jedisCluster.hgetAll(this.options.hkey);
 -            List<String> values = buildValuesFromMap(keys, keyValue);
 -            return deserializeValues(keys, values);
 -        }
 -    }
 +            return values;
  
 -    private List<String> buildValuesFromMap(List<List<Object>> keys, Map<String, String> keyValue) {
 -        List<String> values = new ArrayList<String>(keys.size());
 -        for (List<Object> key : keys) {
 -            String strKey = keyFactory.build(key);
 -            String value = keyValue.get(strKey);
 -            values.add(value);
 -        }
 -        return values;
 -    }
 +        case HASH:
 +            return jedisCluster.hmget(description.getAdditionalKey(), stringKeys);
  
 -    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
 -        List<T> result = new ArrayList<T>(keys.size());
 -        for (String value : values) {
 -            if (value != null) {
 -                result.add((T) serializer.deserialize(value.getBytes()));
 -            } else {
 -                result.add(null);
 -            }
 +        default:
 +            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
          }
 -        return result;
      }
  
 -    private String[] buildKeys(List<List<Object>> keys) {
 -        String[] stringKeys = new String[keys.size()];
 -        int index = 0;
 -        for (List<Object> key : keys)
 -            stringKeys[index++] = keyFactory.build(key);
 -        return stringKeys;
 -    }
 -
 -    public void multiPut(List<List<Object>> keys, List<T> vals) {
 -        if (keys.size() == 0) {
 -            return;
 -        }
 -
 -        final int expireIntervalSec = this.options.expireIntervalSec;
 -        if (Strings.isNullOrEmpty(this.options.hkey)) {
 -            for (int i = 0; i < keys.size(); i++) {
 -                String val = new String(serializer.serialize(vals.get(i)));
 -                String redisKey = keyFactory.build(keys.get(i));
 -                if (expireIntervalSec > 0) {
 -                    jedisCluster.setex(redisKey, expireIntervalSec, val);
 +    @Override
 +    protected void updateStatesToRedis(Map<String, String> keyValues) {
 +        RedisDataTypeDescription description = this.options.dataTypeDescription;
 +        switch (description.getDataType()) {
 +        case STRING:
 +            for (Map.Entry<String, String> kvEntry : keyValues.entrySet()) {
-                 jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
++                if(this.options.expireIntervalSec > 0){
++                    jedisCluster.setex(kvEntry.getKey(), this.options.expireIntervalSec, kvEntry.getValue());
+                 } else {
 -                    jedisCluster.set(redisKey, val);
++                    jedisCluster.set(kvEntry.getKey(), kvEntry.getValue());
+                 }
              }
 -        } else {
 -            Map<String, String> keyValues = new HashMap<String, String>();
 -            for (int i = 0; i < keys.size(); i++) {
 -                String val = new String(serializer.serialize(vals.get(i)));
 -                keyValues.put(keyFactory.build(keys.get(i)), val);
 -            }
 -            jedisCluster.hmset(this.options.hkey, keyValues);
 -            if (expireIntervalSec > 0) {
 -                jedisCluster.expire(this.options.hkey, expireIntervalSec);
 +            break;
 +
 +        case HASH:
 +            jedisCluster.hmset(description.getAdditionalKey(), keyValues);
++            if (this.options.expireIntervalSec > 0) {
++                jedisCluster.expire(description.getAdditionalKey(), this.options.expireIntervalSec);
+             }
 +            break;
 +
 +        default:
 +            throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/storm/blob/11768ba2/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --cc external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 85cbff2,84cd1f9..f0ab941
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@@ -19,10 -19,18 +19,11 @@@ package org.apache.storm.redis.trident.
  
  import backtype.storm.task.IMetricsContext;
  import backtype.storm.tuple.Values;
 -import com.google.common.base.Strings;
 -import com.google.common.collect.ImmutableMap;
 -import com.google.common.collect.Maps;
  import org.apache.storm.redis.common.config.JedisPoolConfig;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
  import redis.clients.jedis.Jedis;
  import redis.clients.jedis.JedisPool;
+ import redis.clients.jedis.Pipeline;
 -import storm.trident.state.JSONNonTransactionalSerializer;
 -import storm.trident.state.JSONOpaqueSerializer;
 -import storm.trident.state.JSONTransactionalSerializer;
  import storm.trident.state.OpaqueValue;
  import storm.trident.state.Serializer;
  import storm.trident.state.State;
@@@ -178,69 -221,88 +179,79 @@@ public class RedisMapState<T> extends A
          this.keyFactory = keyFactory;
      }
  
 -    public List<T> multiGet(List<List<Object>> keys) {
 -        if (keys.size() == 0) {
 -            return Collections.emptyList();
 -        }
 +    @Override
 +    protected Serializer getSerializer() {
 +        return serializer;
 +    }
  
 -        String[] stringKeys = buildKeys(keys);
 -
 -        if (Strings.isNullOrEmpty(this.options.hkey)) {
 -            Jedis jedis = null;
 -            try {
 -                jedis = jedisPool.getResource();
 -                List<String> values = jedis.mget(stringKeys);
 -                return deserializeValues(keys, values);
 -            } finally {
 -                if (jedis != null) {
 -                    jedisPool.returnResource(jedis);
 -                }
 -            }
 -        } else {
 -            Jedis jedis = null;
 -            try {
 -                jedis = jedisPool.getResource();
 -                List<String> values = jedis.hmget(this.options.hkey, stringKeys);
 -                return deserializeValues(keys, values);
 -            } finally {
 -                if (jedis != null) {
 -                    jedisPool.returnResource(jedis);
 -                }
 -            }
 -        }
 +    @Override
 +    protected KeyFactory getKeyFactory() {
 +        return keyFactory;
      }
  
 -    private List<T> deserializeValues(List<List<Object>> keys, List<String> values) {
 -        List<T> result = new ArrayList<T>(keys.size());
 -        for (String value : values) {
 -            if (value != null) {
 -                result.add((T) serializer.deserialize(value.getBytes()));
 -            } else {
 -                result.add(null);
 +    @Override
 +    protected List<String> retrieveValuesFromRedis(List<String> keys) {
 +        String[] stringKeys = keys.toArray(new String[keys.size()]);
 +
 +        Jedis jedis = null;
 +        try {
 +            jedis = jedisPool.getResource();
 +
 +            RedisDataTypeDescription description = this.options.dataTypeDescription;
 +            switch (description.getDataType()) {
 +            case STRING:
 +                return jedis.mget(stringKeys);
 +
 +            case HASH:
 +                return jedis.hmget(description.getAdditionalKey(), stringKeys);
 +
 +            default:
 +                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
              }
 -        }
 -        return result;
 -    }
  
 -    private String[] buildKeys(List<List<Object>> keys) {
 -        String[] stringKeys = new String[keys.size()];
 -        int index = 0;
 -        for (List<Object> key : keys)
 -            stringKeys[index++] = keyFactory.build(key);
 -        return stringKeys;
 +        } finally {
 +            if (jedis != null) {
 +                jedis.close();
 +            }
 +        }
      }
  
 -    public void multiPut(List<List<Object>> keys, List<T> vals) {
 -        if (keys.size() == 0) {
 -            return;
 -        }
 +    @Override
 +    protected void updateStatesToRedis(Map<String, String> keyValues) {
 +        Jedis jedis = null;
  
 -        Jedis jedis = jedisPool.getResource();
          try {
 -            if (Strings.isNullOrEmpty(this.options.hkey)) {
 -                String[] keyValue = buildKeyValuesList(keys, vals);
 +            jedis = jedisPool.getResource();
 +
 +            RedisDataTypeDescription description = this.options.dataTypeDescription;
 +            switch (description.getDataType()) {
 +            case STRING:
 +                String[] keyValue = buildKeyValuesList(keyValues);
                  jedis.mset(keyValue);
 -                if (this.options.expireIntervalSec > 0) {
++                if(this.options.expireIntervalSec > 0){
+                     Pipeline pipe = jedis.pipelined();
 -                    for(int i = 0; i < keyValue.length; i += 2) {
++                    for(int i = 0; i < keyValue.length; i += 2){
+                         pipe.expire(keyValue[i], this.options.expireIntervalSec);
+                     }
+                     pipe.sync();
+                 }
 -            } else {
 -                Map<String, String> keyValues = new HashMap<String, String>();
 -                for (int i = 0; i < keys.size(); i++) {
 -                    String val = new String(serializer.serialize(vals.get(i)));
 -                    keyValues.put(keyFactory.build(keys.get(i)), val);
 -                }
 -                jedis.hmset(this.options.hkey, keyValues);
 +                break;
 +
 +            case HASH:
 +                jedis.hmset(description.getAdditionalKey(), keyValues);
+                 if (this.options.expireIntervalSec > 0) {
 -                    jedis.expire(this.options.hkey, this.options.expireIntervalSec);
++                    jedis.expire(description.getAdditionalKey(), this.options.expireIntervalSec);
+                 }
 +                break;
 +
 +            default:
 +                throw new IllegalArgumentException("Cannot process such data type: " + description.getDataType());
              }
 +
          } finally {
 -            jedisPool.returnResource(jedis);
 +            if (jedis != null) {
 +                jedis.close();
 +            }
          }
      }
  


[5/9] storm git commit: STORM-842: Drop Support for Java 1.6

Posted by pt...@apache.org.
STORM-842: Drop Support for Java 1.6


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc736002
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc736002
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc736002

Branch: refs/heads/master
Commit: fc73600228c156a68327a342bf2c2da514620bbb
Parents: ad98824
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Jun 1 17:15:06 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Mon Jun 1 17:15:06 2015 -0400

----------------------------------------------------------------------
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fc736002/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bfb9069..4729554 100644
--- a/pom.xml
+++ b/pom.xml
@@ -658,8 +658,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>1.6</source>
-                    <target>1.6</target>
+                    <source>1.7</source>
+                    <target>1.7</target>
                 </configuration>
             </plugin>
             <plugin>