You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/08/24 22:50:49 UTC

[1/4] storm git commit: Refactored TimeCacheMap to extend RotatingMap. This should allow current users of the TimeCacheMap to progressively migrate off the deprecated TimeCacheMap to the simpler RotatingMap. This also improves code reuse since many of t

Repository: storm
Updated Branches:
  refs/heads/master 460202eed -> ce6c1c389


Refactored TimeCacheMap to extend RotatingMap.
This should allow current users of the TimeCacheMap to progressively migrate off the deprecated TimeCacheMap to the simpler RotatingMap.
 This also improves code reuse since many of the methods were shared.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/356e7474
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/356e7474
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/356e7474

Branch: refs/heads/master
Commit: 356e7474c9ede3c45232b39e895d554349a55328
Parents: 9e80903
Author: ddebree <dd...@gmail.com>
Authored: Thu Aug 13 13:41:36 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Thu Aug 13 13:41:36 2015 +0200

----------------------------------------------------------------------
 .../jvm/backtype/storm/utils/RotatingMap.java   | 12 ++-
 .../jvm/backtype/storm/utils/TimeCacheMap.java  | 79 +++++---------------
 2 files changed, 26 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/356e7474/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
index aca8db6..2b8d66b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
@@ -32,18 +32,22 @@ import java.util.Map.Entry;
  *
  * The advantage of this design is that the expiration thread only locks the object
  * for O(1) time, meaning the object is essentially always available for gets/puts.
+ *
+ * Note: This class is not thread-safe since it does not protect against changes to
+ * _buckets while it is being read
+ *
  */
 public class RotatingMap<K, V> {
     //this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
 
-    public static interface ExpiredCallback<K, V> {
-        public void expire(K key, V val);
+    public interface ExpiredCallback<K, V> {
+        void expire(K key, V val);
     }
 
-    private LinkedList<HashMap<K, V>> _buckets;
+    private final LinkedList<HashMap<K, V>> _buckets;
 
-    private ExpiredCallback _callback;
+    private final ExpiredCallback<K, V> _callback;
     
     public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
         if(numBuckets<2) {

http://git-wip-us.apache.org/repos/asf/storm/blob/356e7474/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
index 36d1bae..ba9594c 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
@@ -17,11 +17,7 @@
  */
 package backtype.storm.utils;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * Expires keys that have not been updated in the configured number of seconds.
@@ -35,48 +31,28 @@ import java.util.Map.Entry;
  */
 //deprecated in favor of non-threaded RotatingMap
 @Deprecated
-public class TimeCacheMap<K, V> {
+public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
     //this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
 
-    public static interface ExpiredCallback<K, V> {
-        public void expire(K key, V val);
+    public interface ExpiredCallback<K, V> extends RotatingMap.ExpiredCallback<K, V> {
+        void expire(K key, V val);
     }
 
-    private LinkedList<HashMap<K, V>> _buckets;
-
     private final Object _lock = new Object();
-    private Thread _cleaner;
-    private ExpiredCallback _callback;
+    private final Thread _cleaner;
     
     public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
-        if(numBuckets<2) {
-            throw new IllegalArgumentException("numBuckets must be >= 2");
-        }
-        _buckets = new LinkedList<HashMap<K, V>>();
-        for(int i=0; i<numBuckets; i++) {
-            _buckets.add(new HashMap<K, V>());
-        }
-
+        super(numBuckets, callback);
 
-        _callback = callback;
         final long expirationMillis = expirationSecs * 1000L;
         final long sleepTime = expirationMillis / (numBuckets-1);
         _cleaner = new Thread(new Runnable() {
             public void run() {
                 try {
                     while(true) {
-                        Map<K, V> dead = null;
                         Time.sleep(sleepTime);
-                        synchronized(_lock) {
-                            dead = _buckets.removeLast();
-                            _buckets.addFirst(new HashMap<K, V>());
-                        }
-                        if(_callback!=null) {
-                            for(Entry<K, V> entry: dead.entrySet()) {
-                                _callback.expire(entry.getKey(), entry.getValue());
-                            }
-                        }
+                        rotate();
                     }
                 } catch (InterruptedException ex) {
 
@@ -99,59 +75,40 @@ public class TimeCacheMap<K, V> {
         this(expirationSecs, numBuckets, null);
     }
 
-
     public boolean containsKey(K key) {
         synchronized(_lock) {
-            for(HashMap<K, V> bucket: _buckets) {
-                if(bucket.containsKey(key)) {
-                    return true;
-                }
-            }
-            return false;
+            return super.containsKey(key);
         }
     }
 
     public V get(K key) {
         synchronized(_lock) {
-            for(HashMap<K, V> bucket: _buckets) {
-                if(bucket.containsKey(key)) {
-                    return bucket.get(key);
-                }
-            }
-            return null;
+            return super.get(key);
         }
     }
 
     public void put(K key, V value) {
         synchronized(_lock) {
-            Iterator<HashMap<K, V>> it = _buckets.iterator();
-            HashMap<K, V> bucket = it.next();
-            bucket.put(key, value);
-            while(it.hasNext()) {
-                bucket = it.next();
-                bucket.remove(key);
-            }
+            super.put(key, value);
         }
     }
     
     public Object remove(K key) {
         synchronized(_lock) {
-            for(HashMap<K, V> bucket: _buckets) {
-                if(bucket.containsKey(key)) {
-                    return bucket.remove(key);
-                }
-            }
-            return null;
+            return super.remove(key);
         }
     }
 
     public int size() {
         synchronized(_lock) {
-            int size = 0;
-            for(HashMap<K, V> bucket: _buckets) {
-                size+=bucket.size();
-            }
-            return size;
+            return super.size();
+        }
+    }
+
+    @Override
+    public Map<K, V> rotate() {
+        synchronized (_lock) {
+            return super.rotate();
         }
     }
 


[3/4] storm git commit: Merge branch 'TimeCacheMap_refactoring' of https://github.com/ddebree/storm

Posted by kn...@apache.org.
Merge branch 'TimeCacheMap_refactoring' of https://github.com/ddebree/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d759b19
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d759b19
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d759b19

Branch: refs/heads/master
Commit: 8d759b197288f854e6bd4a6a09c607cf66df88c4
Parents: 460202e 0f283f6
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Aug 24 15:04:32 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Aug 24 15:04:32 2015 -0500

----------------------------------------------------------------------
 .../jvm/backtype/storm/utils/RotatingMap.java   | 12 ++--
 .../jvm/backtype/storm/utils/TimeCacheMap.java  | 60 ++++----------------
 2 files changed, 20 insertions(+), 52 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: Added STORM-990 to CHANGELOG.md

Posted by kn...@apache.org.
Added STORM-990 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce6c1c38
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce6c1c38
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce6c1c38

Branch: refs/heads/master
Commit: ce6c1c389aef838dcb5f145d4af05815d62ee016
Parents: 8d759b1
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Aug 24 15:50:29 2015 -0500
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Aug 24 15:50:29 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce6c1c38/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2f41e7e..d043a5f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-990: Refactored TimeCacheMap to extend RotatingMap
  * STORM-829: Hadoop dependency confusion
  * STORM-166: Nimbus HA
  * STORM-976: Config storm.logback.conf.dir is specific to previous logging framework


[2/4] storm git commit: Changed to use RotatingMap as a member variable instead of a super-class.

Posted by kn...@apache.org.
Changed to use RotatingMap as a member variable instead of a super-class.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f283f66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f283f66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f283f66

Branch: refs/heads/master
Commit: 0f283f662e98a2163d20c4fd8bf719d1885edfce
Parents: 356e747
Author: ddebree <dd...@gmail.com>
Authored: Fri Aug 14 23:02:01 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Fri Aug 14 23:03:33 2015 +0200

----------------------------------------------------------------------
 .../jvm/backtype/storm/utils/TimeCacheMap.java  | 39 ++++++++++++--------
 1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f283f66/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
index ba9594c..dd92696 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
@@ -18,6 +18,7 @@
 package backtype.storm.utils;
 
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Expires keys that have not been updated in the configured number of seconds.
@@ -31,28 +32,41 @@ import java.util.Map;
  */
 //deprecated in favor of non-threaded RotatingMap
 @Deprecated
-public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
+public class TimeCacheMap<K, V> {
     //this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
 
-    public interface ExpiredCallback<K, V> extends RotatingMap.ExpiredCallback<K, V> {
+    public interface ExpiredCallback<K, V> {
         void expire(K key, V val);
     }
 
+    private final RotatingMap<K, V> _rotatingMap;
+
     private final Object _lock = new Object();
     private final Thread _cleaner;
+    private ExpiredCallback<K, V> _callback;
     
     public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
-        super(numBuckets, callback);
 
+        _rotatingMap = new RotatingMap<>(numBuckets);
+
+        _callback = callback;
         final long expirationMillis = expirationSecs * 1000L;
         final long sleepTime = expirationMillis / (numBuckets-1);
         _cleaner = new Thread(new Runnable() {
             public void run() {
                 try {
                     while(true) {
+                        Map<K, V> dead = null;
                         Time.sleep(sleepTime);
-                        rotate();
+                        synchronized(_lock) {
+                            dead = _rotatingMap.rotate();
+                        }
+                        if(_callback!=null) {
+                            for(Entry<K, V> entry: dead.entrySet()) {
+                                _callback.expire(entry.getKey(), entry.getValue());
+                            }
+                        }
                     }
                 } catch (InterruptedException ex) {
 
@@ -77,38 +91,31 @@ public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
 
     public boolean containsKey(K key) {
         synchronized(_lock) {
-            return super.containsKey(key);
+            return _rotatingMap.containsKey(key);
         }
     }
 
     public V get(K key) {
         synchronized(_lock) {
-            return super.get(key);
+            return _rotatingMap.get(key);
         }
     }
 
     public void put(K key, V value) {
         synchronized(_lock) {
-            super.put(key, value);
+            _rotatingMap.put(key, value);
         }
     }
     
     public Object remove(K key) {
         synchronized(_lock) {
-            return super.remove(key);
+            return _rotatingMap.remove(key);
         }
     }
 
     public int size() {
         synchronized(_lock) {
-            return super.size();
-        }
-    }
-
-    @Override
-    public Map<K, V> rotate() {
-        synchronized (_lock) {
-            return super.rotate();
+            return _rotatingMap.size();
         }
     }