You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/06/10 17:01:33 UTC
[48/50] [abbrv] git commit: WIP
WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/28912f7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/28912f7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/28912f7d
Branch: refs/heads/ir
Commit: 28912f7df76c4f1fec156277bbf09bba05d53fec
Parents: 2febf6b
Author: Jordan Zimmerman <jo...@jordanzimmerman.com>
Authored: Thu Feb 28 13:25:45 2013 -0800
Committer: Jordan Zimmerman <jo...@jordanzimmerman.com>
Committed: Thu Feb 28 13:25:45 2013 -0800
----------------------------------------------------------------------
build.gradle | 11 ++
.../curator/x/sync/CuratorGlobalSync.java | 136 +++++++++++++++++++
.../curator/x/sync/InternalSyncSpec.java | 69 ++++++++++
.../netflix/curator/x/sync/NodeSyncQueue.java | 103 ++++++++++++++
.../com/netflix/curator/x/sync/RemoteSync.java | 37 +++++
.../com/netflix/curator/x/sync/SyncSpec.java | 23 ++++
.../com/netflix/curator/x/sync/SyncTypes.java | 9 ++
settings.gradle | 2 +-
8 files changed, 389 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d8bcce2..0efc622 100644
--- a/build.gradle
+++ b/build.gradle
@@ -140,3 +140,14 @@ project(':curator-examples')
compile project(':curator-x-discovery')
}
}
+
+project(':curator-x-global-sync')
+{
+ dependencies
+ {
+ compile project(':curator-client')
+ compile project(':curator-framework')
+ compile project(':curator-recipes')
+ testCompile project(':curator-test')
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
new file mode 100644
index 0000000..2d573e3
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
@@ -0,0 +1,136 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import com.netflix.curator.framework.recipes.cache.NodeCache;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import com.netflix.curator.utils.ThreadUtils;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CuratorGlobalSync implements Closeable
+{
+ private final List<InternalSyncSpec> specs;
+ private final NodeSyncQueue nodeSyncQueue;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, CuratorFramework... remoteClients)
+ {
+ this(localClient, specs, makeExecutorService(), Arrays.asList(remoteClients));
+ }
+
+ public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, Collection<CuratorFramework> remoteClients)
+ {
+ this(localClient, specs, makeExecutorService(), remoteClients);
+ }
+
+ public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, ExecutorService service, CuratorFramework... remoteClients)
+ {
+ this(localClient, specs, service, Arrays.asList(remoteClients));
+ }
+
+ public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, ExecutorService service, Collection<CuratorFramework> remoteClients)
+ {
+ remoteClients = Preconditions.checkNotNull(remoteClients, "remoteClients cannot be null");
+ specs = Preconditions.checkNotNull(specs, "specs cannot be null");
+ localClient = Preconditions.checkNotNull(localClient, "localClient cannot be null");
+ Preconditions.checkArgument(!remoteClients.contains(localClient), "The remoteClients list cannot contain the localClient");
+
+ ImmutableList.Builder<InternalSyncSpec> builder = ImmutableList.builder();
+ for ( SyncSpec spec : specs )
+ {
+ InternalSyncSpec internalSyncSpec = makeInternalSyncSpec(localClient, service, spec);
+ builder.add(internalSyncSpec);
+ }
+ this.specs = builder.build();
+
+ nodeSyncQueue = new NodeSyncQueue(this, service, remoteClients);
+ }
+
+ public void postNodeToSync(ChildData nodeData)
+ {
+ nodeSyncQueue.add(nodeData);
+ }
+
+ public void start() throws Exception
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+ nodeSyncQueue.start();
+ for ( InternalSyncSpec spec : specs )
+ {
+ spec.start();
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ for ( InternalSyncSpec spec : specs )
+ {
+ Closeables.closeQuietly(spec);
+ }
+ }
+ Closeables.closeQuietly(nodeSyncQueue);
+ }
+
+ private InternalSyncSpec makeInternalSyncSpec(CuratorFramework localClient, ExecutorService service, SyncSpec spec)
+ {
+ InternalSyncSpec internalSyncSpec;
+ switch ( spec.getType() )
+ {
+ case SINGLE_NODE:
+ {
+ internalSyncSpec = new InternalSyncSpec(this, new NodeCache(localClient, spec.getPath(), false));
+ break;
+ }
+
+ case SINGLE_NODE_COMPRESSED_DATA:
+ {
+ internalSyncSpec = new InternalSyncSpec(this, new NodeCache(localClient, spec.getPath(), true));
+ break;
+ }
+
+ case CHILDREN_OF_NODE:
+ {
+ internalSyncSpec = new InternalSyncSpec(this, new PathChildrenCache(localClient, spec.getPath(), true, false, service));
+ break;
+ }
+
+ case CHILDREN_OF_NODE_COMPRESSED_DATA:
+ {
+ internalSyncSpec = new InternalSyncSpec(this, new PathChildrenCache(localClient, spec.getPath(), true, true, service));
+ break;
+ }
+
+ default:
+ {
+ throw new RuntimeException(); // will never get here
+ }
+ }
+ return internalSyncSpec;
+ }
+
+ private static ExecutorService makeExecutorService()
+ {
+ return Executors.newCachedThreadPool(ThreadUtils.newThreadFactory("CuratorGlobalSync"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
new file mode 100644
index 0000000..b5bea03
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
@@ -0,0 +1,69 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.io.Closeables;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.NodeCache;
+import com.netflix.curator.framework.recipes.cache.NodeCacheListener;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
+import java.io.Closeable;
+
+class InternalSyncSpec implements Closeable, PathChildrenCacheListener, NodeCacheListener
+{
+ private final PathChildrenCache pathCache;
+ private final NodeCache nodeCache;
+ private final CuratorGlobalSync sync;
+
+ InternalSyncSpec(CuratorGlobalSync sync, PathChildrenCache pathCache)
+ {
+ this.sync = sync;
+ this.pathCache = pathCache;
+ nodeCache = null;
+
+ pathCache.getListenable().addListener(this);
+ }
+
+ InternalSyncSpec(CuratorGlobalSync sync, NodeCache nodeCache)
+ {
+ this.sync = sync;
+ pathCache = null;
+ this.nodeCache = nodeCache;
+
+ nodeCache.getListenable().addListener(this);
+ }
+
+ public void start() throws Exception
+ {
+ if ( pathCache != null )
+ {
+ pathCache.start();
+ }
+
+ if ( nodeCache != null )
+ {
+ nodeCache.start();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ Closeables.closeQuietly(pathCache);
+ Closeables.closeQuietly(nodeCache);
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ // TODO what about deletes?
+ sync.postNodeToSync(event.getData());
+ }
+
+ @Override
+ public void nodeChanged() throws Exception
+ {
+ // TODO what about deletes?
+ sync.postNodeToSync(nodeCache.getCurrentData());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
new file mode 100644
index 0000000..83b351d
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
@@ -0,0 +1,103 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.data.Stat;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class NodeSyncQueue implements Closeable
+{
+ private final CuratorGlobalSync sync;
+ private final ExecutorService service;
+ private final Collection<CuratorFramework> remoteClients;
+ private final BlockingQueue<ChildDataHolder> queue = new LinkedBlockingQueue<ChildDataHolder>(); // TODO capacity
+
+ private static class ChildDataHolder
+ {
+ private final ChildData data;
+
+ private ChildDataHolder(ChildData data)
+ {
+ this.data = data;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ ChildDataHolder that = (ChildDataHolder)o;
+ return data.getPath().equals(that.data.getPath());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return data.getPath().hashCode();
+ }
+ }
+
+ NodeSyncQueue(CuratorGlobalSync sync, ExecutorService service, Collection<CuratorFramework> remoteClients)
+ {
+ this.sync = sync;
+ this.service = service;
+ this.remoteClients = ImmutableList.copyOf(remoteClients);
+ }
+
+ void start()
+ {
+ service.submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ process(queue.take());
+ }
+ }
+ catch ( InterruptedException dummy )
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ );
+ }
+
+ private void process(ChildDataHolder childDataHolder)
+ {
+ for ( final CuratorFramework remote : remoteClients )
+ {
+ service.submit(new RemoteSync(sync, remote, childDataHolder.data));
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ service.shutdownNow();
+ }
+
+ synchronized void add(ChildData nodeData)
+ {
+ // TODO what about deletes?
+ queue.add(new ChildDataHolder(nodeData));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
new file mode 100644
index 0000000..304df96
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
@@ -0,0 +1,37 @@
+package com.netflix.curator.x.sync;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.data.Stat;
+
+class RemoteSync implements Runnable
+{
+ private final CuratorGlobalSync sync;
+ private final CuratorFramework remoteClient;
+ private final ChildData data;
+
+ RemoteSync(CuratorGlobalSync sync, CuratorFramework remoteClient, ChildData data)
+ {
+ this.sync = sync;
+ this.remoteClient = remoteClient;
+ this.data = data;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ // TODO - is version enough? should I use timestamp instead?
+ Stat stat = remoteClient.checkExists().forPath(data.getPath());
+ if ( stat.getMtime() < data.getStat().getMtime() )
+ {
+ remoteClient.setData().withVersion(stat.getVersion()).forPath(data.getPath(), data.getData());
+ }
+ }
+ catch ( Exception e )
+ {
+ // TODO - no node? bad version? connection issues?
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
new file mode 100644
index 0000000..08693a6
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
@@ -0,0 +1,23 @@
+package com.netflix.curator.x.sync;
+
+public class SyncSpec
+{
+ private final String path;
+ private final SyncTypes type;
+
+ public SyncSpec(String path, SyncTypes type)
+ {
+ this.path = path;
+ this.type = type;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public SyncTypes getType()
+ {
+ return type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
new file mode 100644
index 0000000..5a27b83
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
@@ -0,0 +1,9 @@
+package com.netflix.curator.x.sync;
+
+public enum SyncTypes
+{
+ SINGLE_NODE,
+ SINGLE_NODE_COMPRESSED_DATA,
+ CHILDREN_OF_NODE,
+ CHILDREN_OF_NODE_COMPRESSED_DATA
+}
http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 4466dc7..b31488e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1,2 @@
rootProject.name='curator'
-include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server','curator-x-zkclient-bridge','curator-examples'
+include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server','curator-x-zkclient-bridge','curator-examples','curator-x-global-sync'