You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/07 20:19:03 UTC

[05/50] [abbrv] git commit: node cache support

node cache support


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/985c9ed0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/985c9ed0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/985c9ed0

Branch: refs/heads/master
Commit: 985c9ed008a77d179066f9873716636fff4be959
Parents: 6f9a9ab
Author: randgalt <ra...@apache.org>
Authored: Thu May 29 16:17:46 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu May 29 16:17:46 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rpc/connections/CuratorEntry.java |   12 +-
 .../x/rpc/idl/event/RpcCuratorEvent.java        |   16 +
 .../x/rpc/idl/event/RpcCuratorEventType.java    |    3 +-
 .../projection/CuratorProjectionService.java    |   55 +-
 .../rpc/idl/projection/NodeCacheProjection.java |   20 +
 curator-x-rpc/src/main/thrift/curator.thrift    |    8 +-
 .../curator/generated/CuratorEventType.java     |    5 +-
 .../curator/generated/CuratorService.java       | 3893 ++++++++++++++----
 .../curator/generated/NodeCacheProjection.java  |  393 ++
 .../org/apache/curator/x/rpc/TestClient.java    |    7 +
 10 files changed, 3509 insertions(+), 903 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
index 1ecd795..2613ad0 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
@@ -98,12 +98,6 @@ public class CuratorEntry implements Closeable
         return UUID.randomUUID().toString();
     }
 
-    public <T> String addThing(String id, T thing, Closer<T> closer)
-    {
-        things.put(id, new Entry(thing, closer));
-        return id;
-    }
-
     public <T> T getThing(String id, Class<T> clazz)
     {
         Entry entry = (id != null) ? things.get(id) : null;
@@ -121,6 +115,12 @@ public class CuratorEntry implements Closeable
         return false;
     }
 
+    private <T> String addThing(String id, T thing, Closer<T> closer)
+    {
+        things.put(id, new Entry(thing, closer));
+        return id;
+    }
+
     private <T> T cast(Class<T> clazz, Entry entry)
     {
         if ( entry != null )

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
index aacf076..400f8c9 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEvent.java
@@ -86,6 +86,22 @@ public class RpcCuratorEvent
         this.childrenCacheEvent = null;
     }
 
+    public RpcCuratorEvent(RpcCuratorEventType type, String path)
+    {
+        this.type = type;
+        this.resultCode = 0;
+        this.path = path;
+        this.context = null;
+        this.stat = null;
+        this.data = null;
+        this.name = null;
+        this.children = null;
+        this.aclList = null;
+        this.watchedEvent = null;
+        this.leaderEvent = null;
+        this.childrenCacheEvent = null;
+    }
+
     public RpcCuratorEvent(RpcPathChildrenCacheEvent childrenCacheEvent)
     {
         this.type = RpcCuratorEventType.PATH_CHILDREN_CACHE;

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
index eee5929..0bda3d6 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/RpcCuratorEventType.java
@@ -41,5 +41,6 @@ public enum RpcCuratorEventType
     CONNECTION_LOST,
     CONNECTION_READ_ONLY,
     LEADER,
-    PATH_CHILDREN_CACHE
+    PATH_CHILDREN_CACHE,
+    NODE_CACHE
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 8b41353..a870447 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -27,6 +27,8 @@ import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@@ -316,8 +318,7 @@ public class CuratorProjectionService
                 }
             }
         };
-        final String id = CuratorEntry.newId();
-        entry.addThing(id, leaderLatch, closer);
+        String id = entry.addThing(leaderLatch, closer);
 
         LeaderLatchListener listener = new LeaderLatchListener()
         {
@@ -393,8 +394,7 @@ public class CuratorProjectionService
                 }
             }
         };
-        final String id = CuratorEntry.newId();
-        entry.addThing(id, cache, closer);
+        String id = entry.addThing(cache, closer);
 
         PathChildrenCacheListener listener = new PathChildrenCacheListener()
         {
@@ -438,6 +438,53 @@ public class CuratorProjectionService
         return new RpcChildData(pathChildrenCache.getCurrentData(path));
     }
 
+    @ThriftMethod
+    public NodeCacheProjection startNodeCache(CuratorProjection projection, final String path, boolean dataIsCompressed, boolean buildInitial) throws Exception
+    {
+        final CuratorEntry entry = getEntry(projection);
+
+        final NodeCache cache = new NodeCache(entry.getClient(), path, dataIsCompressed);
+        cache.start(buildInitial);
+
+        Closer<NodeCache> closer = new Closer<NodeCache>()
+        {
+            @Override
+            public void close(NodeCache cache)
+            {
+                try
+                {
+                    cache.close();
+                }
+                catch ( IOException e )
+                {
+                    log.error("Could not close left-over NodeCache for path: " + path, e);
+                }
+            }
+        };
+        String id = entry.addThing(cache, closer);
+
+        NodeCacheListener listener = new NodeCacheListener()
+        {
+            @Override
+            public void nodeChanged() throws Exception
+            {
+                entry.addEvent(new RpcCuratorEvent(RpcCuratorEventType.NODE_CACHE, path));
+            }
+        };
+        cache.getListenable().addListener(listener);
+
+        return new NodeCacheProjection(new GenericProjection(id));
+    }
+
+    @ThriftMethod
+    public RpcChildData getNodeCacheData(CuratorProjection projection, NodeCacheProjection cacheProjection) throws Exception
+    {
+        final CuratorEntry entry = getEntry(projection);
+
+        NodeCache nodeCache = getThing(entry, cacheProjection.projection.id, NodeCache.class);
+        return new RpcChildData(nodeCache.getCurrentData());
+    }
+
     public void addEvent(CuratorProjection projection, RpcCuratorEvent event)
     {
         CuratorEntry entry = connectionManager.get(projection.id);

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/NodeCacheProjection.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/NodeCacheProjection.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/NodeCacheProjection.java
new file mode 100644
index 0000000..43d1138
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/NodeCacheProjection.java
@@ -0,0 +1,20 @@
+package org.apache.curator.x.rpc.idl.projection;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
+@ThriftStruct
+public class NodeCacheProjection
+{
+    @ThriftField(1)
+    public GenericProjection projection;
+
+    public NodeCacheProjection()
+    {
+    }
+
+    public NodeCacheProjection(GenericProjection projection)
+    {
+        this.projection = projection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index 2b5e115..6ed104d 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -12,7 +12,7 @@ enum CreateMode {
 }
 
 enum CuratorEventType {
-  PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, CONNECTION_READ_ONLY, LEADER, PATH_CHILDREN_CACHE
+  PING, CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN, SYNC, GET_ACL, SET_ACL, WATCHED, CLOSING, CONNECTION_CONNECTED, CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, CONNECTION_READ_ONLY, LEADER, PATH_CHILDREN_CACHE, NODE_CACHE
 }
 
 enum EventType {
@@ -58,6 +58,10 @@ struct LeaderProjection {
   1: GenericProjection projection;
 }
 
+struct NodeCacheProjection {
+  1: GenericProjection projection;
+}
+
 struct PathChildrenCacheProjection {
   1: GenericProjection projection;
 }
@@ -188,12 +192,14 @@ service CuratorService {
   OptionalChildrenList getChildren(1: CuratorProjection projection, 2: GetChildrenSpec spec);
   binary getData(1: CuratorProjection projection, 2: GetDataSpec spec);
   list<Participant> getLeaderParticipants(1: CuratorProjection projection, 2: LeaderProjection leaderProjection);
+  ChildData getNodeCacheData(1: CuratorProjection projection, 2: NodeCacheProjection cacheProjection);
   list<ChildData> getPathChildrenCacheData(1: CuratorProjection projection, 2: PathChildrenCacheProjection cacheProjection);
   ChildData getPathChildrenCacheDataForPath(1: CuratorProjection projection, 2: PathChildrenCacheProjection cacheProjection, 3: string path);
   bool isLeader(1: CuratorProjection projection, 2: LeaderProjection leaderProjection);
   CuratorProjection newCuratorProjection(1: string connectionName);
   Stat setData(1: CuratorProjection projection, 2: SetDataSpec spec);
   LeaderResult startLeaderSelector(1: CuratorProjection projection, 2: string path, 3: string participantId, 4: i32 waitForLeadershipMs);
+  NodeCacheProjection startNodeCache(1: CuratorProjection projection, 2: string path, 3: bool dataIsCompressed, 4: bool buildInitial);
   PathChildrenCacheProjection startPathChildrenCache(1: CuratorProjection projection, 2: string path, 3: bool cacheData, 4: bool dataIsCompressed, 5: PathChildrenCacheStartMode startMode);
 }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/985c9ed0/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
index 46c57ae..08013ec 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorEventType.java
@@ -30,7 +30,8 @@ public enum CuratorEventType implements org.apache.thrift.TEnum {
   CONNECTION_LOST(15),
   CONNECTION_READ_ONLY(16),
   LEADER(17),
-  PATH_CHILDREN_CACHE(18);
+  PATH_CHILDREN_CACHE(18),
+  NODE_CACHE(19);
 
   private final int value;
 
@@ -89,6 +90,8 @@ public enum CuratorEventType implements org.apache.thrift.TEnum {
         return LEADER;
       case 18:
         return PATH_CHILDREN_CACHE;
+      case 19:
+        return NODE_CACHE;
       default:
         return null;
     }