You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/05/27 15:57:36 UTC

[14/14] git commit: wip

wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/abf5fdda
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/abf5fdda
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/abf5fdda

Branch: refs/heads/curator-rpc
Commit: abf5fddae35e62b93256c2e473df8b79b3b76969
Parents: 099df94
Author: randgalt <ra...@apache.org>
Authored: Mon May 26 20:29:20 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon May 26 20:29:20 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/rpc/CuratorEntry.java  | 13 -----
 .../curator/x/rpc/CuratorProjectionServer.java  |  3 +-
 .../org/apache/curator/x/rpc/RpcManager.java    | 57 ++++++++++++++++----
 3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
index 61f97f3..f1b50a0 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
@@ -6,13 +6,11 @@ import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
 import java.io.Closeable;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CuratorEntry implements Closeable
 {
     private final CuratorFramework client;
-    private final AtomicLong lastAccessEpoch = new AtomicLong(0);
     private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
     private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
 
@@ -25,7 +23,6 @@ public class CuratorEntry implements Closeable
     public CuratorEntry(CuratorFramework client)
     {
         this.client = client;
-        updateLastAccess();
     }
 
     @Override
@@ -55,18 +52,8 @@ public class CuratorEntry implements Closeable
         }
     }
 
-    public void updateLastAccess()
-    {
-        lastAccessEpoch.set(System.currentTimeMillis());
-    }
-
     public CuratorFramework getClient()
     {
         return (state.get() == State.OPEN) ? client : null;
     }
-
-    public long getLastAccessEpoch()
-    {
-        return lastAccessEpoch.get();
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 6f94902..75c304b 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -26,12 +26,13 @@ import com.facebook.swift.service.ThriftServiceProcessor;
 import com.google.common.collect.Lists;
 import org.apache.curator.x.rpc.idl.event.EventService;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService;
+import java.util.concurrent.TimeUnit;
 
 public class CuratorProjectionServer
 {
     public static void main(String[] args)
     {
-        RpcManager rpcManager = new RpcManager();
+        RpcManager rpcManager = new RpcManager(TimeUnit.SECONDS.toMillis(10));   // TODO
         EventService eventService = new EventService(rpcManager, 5000); // TODO
         CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager);
         ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);

http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
index dcfa2a0..e238aa6 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
@@ -1,27 +1,66 @@
 package org.apache.curator.x.rpc;
 
-import com.google.common.collect.Maps;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import org.apache.curator.framework.CuratorFramework;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
 
-public class RpcManager
+public class RpcManager implements Closeable
 {
-    private final Map<String, CuratorEntry> projections = Maps.newConcurrentMap();
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Cache<String, CuratorEntry> cache;
+
+    public RpcManager(long expirationMs)
+    {
+        RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
+        {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
+            {
+                if ( notification != null )
+                {
+                    log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
+
+                    CuratorEntry entry = notification.getValue();
+                    if ( entry != null )
+                    {
+                        entry.close();
+                    }
+                }
+            }
+        };
+        cache = CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
+            .removalListener(listener)
+            .build();
+    }
+
+    @Override
+    public void close()
+    {
+        cache.invalidateAll();
+        cache.cleanUp();
+    }
 
     public void add(String id, CuratorFramework client)
     {
-        projections.put(id, new CuratorEntry(client));
+        cache.put(id, new CuratorEntry(client));
     }
 
     public CuratorEntry get(String id)
     {
-        CuratorEntry curatorEntry = projections.get(id);
-        curatorEntry.updateLastAccess();
-        return curatorEntry;
+        return cache.getIfPresent(id);
     }
 
     public CuratorEntry remove(String id)
     {
-        return projections.remove(id);
+        return cache.asMap().remove(id);
     }
 }