You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/05/27 15:57:36 UTC
[14/14] git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/abf5fdda
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/abf5fdda
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/abf5fdda
Branch: refs/heads/curator-rpc
Commit: abf5fddae35e62b93256c2e473df8b79b3b76969
Parents: 099df94
Author: randgalt <ra...@apache.org>
Authored: Mon May 26 20:29:20 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon May 26 20:29:20 2014 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/rpc/CuratorEntry.java | 13 -----
.../curator/x/rpc/CuratorProjectionServer.java | 3 +-
.../org/apache/curator/x/rpc/RpcManager.java | 57 ++++++++++++++++----
3 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
index 61f97f3..f1b50a0 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
@@ -6,13 +6,11 @@ import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorEntry implements Closeable
{
private final CuratorFramework client;
- private final AtomicLong lastAccessEpoch = new AtomicLong(0);
private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
@@ -25,7 +23,6 @@ public class CuratorEntry implements Closeable
public CuratorEntry(CuratorFramework client)
{
this.client = client;
- updateLastAccess();
}
@Override
@@ -55,18 +52,8 @@ public class CuratorEntry implements Closeable
}
}
- public void updateLastAccess()
- {
- lastAccessEpoch.set(System.currentTimeMillis());
- }
-
public CuratorFramework getClient()
{
return (state.get() == State.OPEN) ? client : null;
}
-
- public long getLastAccessEpoch()
- {
- return lastAccessEpoch.get();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 6f94902..75c304b 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -26,12 +26,13 @@ import com.facebook.swift.service.ThriftServiceProcessor;
import com.google.common.collect.Lists;
import org.apache.curator.x.rpc.idl.event.EventService;
import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService;
+import java.util.concurrent.TimeUnit;
public class CuratorProjectionServer
{
public static void main(String[] args)
{
- RpcManager rpcManager = new RpcManager();
+ RpcManager rpcManager = new RpcManager(TimeUnit.SECONDS.toMillis(10)); // TODO
EventService eventService = new EventService(rpcManager, 5000); // TODO
CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager);
ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
http://git-wip-us.apache.org/repos/asf/curator/blob/abf5fdda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
index dcfa2a0..e238aa6 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
@@ -1,27 +1,66 @@
package org.apache.curator.x.rpc;
-import com.google.common.collect.Maps;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import org.apache.curator.framework.CuratorFramework;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
-public class RpcManager
+public class RpcManager implements Closeable
{
- private final Map<String, CuratorEntry> projections = Maps.newConcurrentMap();
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Cache<String, CuratorEntry> cache;
+
+ public RpcManager(long expirationMs)
+ {
+ RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
+ {
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
+ {
+ if ( notification != null )
+ {
+ log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
+
+ CuratorEntry entry = notification.getValue();
+ if ( entry != null )
+ {
+ entry.close();
+ }
+ }
+ }
+ };
+ cache = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
+ .removalListener(listener)
+ .build();
+ }
+
+ @Override
+ public void close()
+ {
+ cache.invalidateAll();
+ cache.cleanUp();
+ }
public void add(String id, CuratorFramework client)
{
- projections.put(id, new CuratorEntry(client));
+ cache.put(id, new CuratorEntry(client));
}
public CuratorEntry get(String id)
{
- CuratorEntry curatorEntry = projections.get(id);
- curatorEntry.updateLastAccess();
- return curatorEntry;
+ return cache.getIfPresent(id);
}
public CuratorEntry remove(String id)
{
- return projections.remove(id);
+ return cache.asMap().remove(id);
}
}