You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/08/24 22:50:50 UTC

[2/4] storm git commit: Changed to use RotatingMap as a member variable instead of a super-class.

Changed to use RotatingMap as a member variable instead of a super-class.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f283f66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f283f66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f283f66

Branch: refs/heads/master
Commit: 0f283f662e98a2163d20c4fd8bf719d1885edfce
Parents: 356e747
Author: ddebree <dd...@gmail.com>
Authored: Fri Aug 14 23:02:01 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Fri Aug 14 23:03:33 2015 +0200

----------------------------------------------------------------------
 .../jvm/backtype/storm/utils/TimeCacheMap.java  | 39 ++++++++++++--------
 1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0f283f66/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
index ba9594c..dd92696 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
@@ -18,6 +18,7 @@
 package backtype.storm.utils;
 
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Expires keys that have not been updated in the configured number of seconds.
@@ -31,28 +32,41 @@ import java.util.Map;
  */
 //deprecated in favor of non-threaded RotatingMap
 @Deprecated
-public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
+public class TimeCacheMap<K, V> {
     //this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
 
-    public interface ExpiredCallback<K, V> extends RotatingMap.ExpiredCallback<K, V> {
+    public interface ExpiredCallback<K, V> {
         void expire(K key, V val);
     }
 
+    private final RotatingMap<K, V> _rotatingMap;
+
     private final Object _lock = new Object();
     private final Thread _cleaner;
+    private ExpiredCallback<K, V> _callback;
     
     public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
-        super(numBuckets, callback);
 
+        _rotatingMap = new RotatingMap<>(numBuckets);
+
+        _callback = callback;
         final long expirationMillis = expirationSecs * 1000L;
         final long sleepTime = expirationMillis / (numBuckets-1);
         _cleaner = new Thread(new Runnable() {
             public void run() {
                 try {
                     while(true) {
+                        Map<K, V> dead = null;
                         Time.sleep(sleepTime);
-                        rotate();
+                        synchronized(_lock) {
+                            dead = _rotatingMap.rotate();
+                        }
+                        if(_callback!=null) {
+                            for(Entry<K, V> entry: dead.entrySet()) {
+                                _callback.expire(entry.getKey(), entry.getValue());
+                            }
+                        }
                     }
                 } catch (InterruptedException ex) {
 
@@ -77,38 +91,31 @@ public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
 
     public boolean containsKey(K key) {
         synchronized(_lock) {
-            return super.containsKey(key);
+            return _rotatingMap.containsKey(key);
         }
     }
 
     public V get(K key) {
         synchronized(_lock) {
-            return super.get(key);
+            return _rotatingMap.get(key);
         }
     }
 
     public void put(K key, V value) {
         synchronized(_lock) {
-            super.put(key, value);
+            _rotatingMap.put(key, value);
         }
     }
     
     public Object remove(K key) {
         synchronized(_lock) {
-            return super.remove(key);
+            return _rotatingMap.remove(key);
         }
     }
 
     public int size() {
         synchronized(_lock) {
-            return super.size();
-        }
-    }
-
-    @Override
-    public Map<K, V> rotate() {
-        synchronized (_lock) {
-            return super.rotate();
+            return _rotatingMap.size();
         }
     }