You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/08/24 22:50:50 UTC
[2/4] storm git commit: Changed to use RotatingMap as a member
variable instead of a super-class.
Changed to use RotatingMap as a member variable instead of a super-class.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f283f66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f283f66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f283f66
Branch: refs/heads/master
Commit: 0f283f662e98a2163d20c4fd8bf719d1885edfce
Parents: 356e747
Author: ddebree <dd...@gmail.com>
Authored: Fri Aug 14 23:02:01 2015 +0200
Committer: ddebree <dd...@gmail.com>
Committed: Fri Aug 14 23:03:33 2015 +0200
----------------------------------------------------------------------
.../jvm/backtype/storm/utils/TimeCacheMap.java | 39 ++++++++++++--------
1 file changed, 23 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f283f66/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
index ba9594c..dd92696 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
@@ -18,6 +18,7 @@
package backtype.storm.utils;
import java.util.Map;
+import java.util.Map.Entry;
/**
* Expires keys that have not been updated in the configured number of seconds.
@@ -31,28 +32,41 @@ import java.util.Map;
*/
//deprecated in favor of non-threaded RotatingMap
@Deprecated
-public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
+public class TimeCacheMap<K, V> {
//this default ensures things expire at most 50% past the expiration time
private static final int DEFAULT_NUM_BUCKETS = 3;
- public interface ExpiredCallback<K, V> extends RotatingMap.ExpiredCallback<K, V> {
+ public interface ExpiredCallback<K, V> {
void expire(K key, V val);
}
+ private final RotatingMap<K, V> _rotatingMap;
+
private final Object _lock = new Object();
private final Thread _cleaner;
+ private ExpiredCallback<K, V> _callback;
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
- super(numBuckets, callback);
+ _rotatingMap = new RotatingMap<>(numBuckets);
+
+ _callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);
_cleaner = new Thread(new Runnable() {
public void run() {
try {
while(true) {
+ Map<K, V> dead = null;
Time.sleep(sleepTime);
- rotate();
+ synchronized(_lock) {
+ dead = _rotatingMap.rotate();
+ }
+ if(_callback!=null) {
+ for(Entry<K, V> entry: dead.entrySet()) {
+ _callback.expire(entry.getKey(), entry.getValue());
+ }
+ }
}
} catch (InterruptedException ex) {
@@ -77,38 +91,31 @@ public class TimeCacheMap<K, V> extends RotatingMap<K, V> {
public boolean containsKey(K key) {
synchronized(_lock) {
- return super.containsKey(key);
+ return _rotatingMap.containsKey(key);
}
}
public V get(K key) {
synchronized(_lock) {
- return super.get(key);
+ return _rotatingMap.get(key);
}
}
public void put(K key, V value) {
synchronized(_lock) {
- super.put(key, value);
+ _rotatingMap.put(key, value);
}
}
public Object remove(K key) {
synchronized(_lock) {
- return super.remove(key);
+ return _rotatingMap.remove(key);
}
}
public int size() {
synchronized(_lock) {
- return super.size();
- }
- }
-
- @Override
- public Map<K, V> rotate() {
- synchronized (_lock) {
- return super.rotate();
+ return _rotatingMap.size();
}
}