You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2013/06/10 17:01:33 UTC

[48/50] [abbrv] git commit: WIP

WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-curator/commit/28912f7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-curator/tree/28912f7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-curator/diff/28912f7d

Branch: refs/heads/ir
Commit: 28912f7df76c4f1fec156277bbf09bba05d53fec
Parents: 2febf6b
Author: Jordan Zimmerman <jo...@jordanzimmerman.com>
Authored: Thu Feb 28 13:25:45 2013 -0800
Committer: Jordan Zimmerman <jo...@jordanzimmerman.com>
Committed: Thu Feb 28 13:25:45 2013 -0800

----------------------------------------------------------------------
 build.gradle                                    |  11 ++
 .../curator/x/sync/CuratorGlobalSync.java       | 136 +++++++++++++++++++
 .../curator/x/sync/InternalSyncSpec.java        |  69 ++++++++++
 .../netflix/curator/x/sync/NodeSyncQueue.java   | 103 ++++++++++++++
 .../com/netflix/curator/x/sync/RemoteSync.java  |  37 +++++
 .../com/netflix/curator/x/sync/SyncSpec.java    |  23 ++++
 .../com/netflix/curator/x/sync/SyncTypes.java   |   9 ++
 settings.gradle                                 |   2 +-
 8 files changed, 389 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index d8bcce2..0efc622 100644
--- a/build.gradle
+++ b/build.gradle
@@ -140,3 +140,14 @@ project(':curator-examples')
         compile project(':curator-x-discovery')
     }
 }
+
+project(':curator-x-global-sync')
+{
+    dependencies
+    {
+        compile project(':curator-client')
+        compile project(':curator-framework')
+        compile project(':curator-recipes')
+        testCompile project(':curator-test')
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
new file mode 100644
index 0000000..2d573e3
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/CuratorGlobalSync.java
@@ -0,0 +1,136 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import com.netflix.curator.framework.recipes.cache.NodeCache;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import com.netflix.curator.utils.ThreadUtils;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CuratorGlobalSync implements Closeable
+{
+    private final List<InternalSyncSpec> specs;
+    private final NodeSyncQueue nodeSyncQueue;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, CuratorFramework... remoteClients)
+    {
+        this(localClient, specs, makeExecutorService(), Arrays.asList(remoteClients));
+    }
+
+    public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, Collection<CuratorFramework> remoteClients)
+    {
+        this(localClient, specs, makeExecutorService(), remoteClients);
+    }
+
+    public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, ExecutorService service, CuratorFramework... remoteClients)
+    {
+        this(localClient, specs, service, Arrays.asList(remoteClients));
+    }
+
+    public CuratorGlobalSync(CuratorFramework localClient, Collection<SyncSpec> specs, ExecutorService service, Collection<CuratorFramework> remoteClients)
+    {
+        remoteClients = Preconditions.checkNotNull(remoteClients, "remoteClients cannot be null");
+        specs = Preconditions.checkNotNull(specs, "specs cannot be null");
+        localClient = Preconditions.checkNotNull(localClient, "localClient cannot be null");
+        Preconditions.checkArgument(!remoteClients.contains(localClient), "The remoteClients list cannot contain the localClient");
+
+        ImmutableList.Builder<InternalSyncSpec> builder = ImmutableList.builder();
+        for ( SyncSpec spec : specs )
+        {
+            InternalSyncSpec internalSyncSpec = makeInternalSyncSpec(localClient, service, spec);
+            builder.add(internalSyncSpec);
+        }
+        this.specs = builder.build();
+
+        nodeSyncQueue = new NodeSyncQueue(this, service, remoteClients);
+    }
+
+    public void postNodeToSync(ChildData nodeData)
+    {
+        nodeSyncQueue.add(nodeData);
+    }
+
+    public void start() throws Exception
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+
+        nodeSyncQueue.start();
+        for ( InternalSyncSpec spec : specs )
+        {
+            spec.start();
+        }
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            for ( InternalSyncSpec spec : specs )
+            {
+                Closeables.closeQuietly(spec);
+            }
+        }
+        Closeables.closeQuietly(nodeSyncQueue);
+    }
+
+    private InternalSyncSpec makeInternalSyncSpec(CuratorFramework localClient, ExecutorService service, SyncSpec spec)
+    {
+        InternalSyncSpec internalSyncSpec;
+        switch ( spec.getType() )
+        {
+            case SINGLE_NODE:
+            {
+                internalSyncSpec = new InternalSyncSpec(this, new NodeCache(localClient, spec.getPath(), false));
+                break;
+            }
+
+            case SINGLE_NODE_COMPRESSED_DATA:
+            {
+                internalSyncSpec = new InternalSyncSpec(this, new NodeCache(localClient, spec.getPath(), true));
+                break;
+            }
+
+            case CHILDREN_OF_NODE:
+            {
+                internalSyncSpec = new InternalSyncSpec(this, new PathChildrenCache(localClient, spec.getPath(), true, false, service));
+                break;
+            }
+
+            case CHILDREN_OF_NODE_COMPRESSED_DATA:
+            {
+                internalSyncSpec = new InternalSyncSpec(this, new PathChildrenCache(localClient, spec.getPath(), true, true, service));
+                break;
+            }
+
+            default:
+            {
+                throw new RuntimeException();   // will never get here
+            }
+        }
+        return internalSyncSpec;
+    }
+
+    private static ExecutorService makeExecutorService()
+    {
+        return Executors.newCachedThreadPool(ThreadUtils.newThreadFactory("CuratorGlobalSync"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
new file mode 100644
index 0000000..b5bea03
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/InternalSyncSpec.java
@@ -0,0 +1,69 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.io.Closeables;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.NodeCache;
+import com.netflix.curator.framework.recipes.cache.NodeCacheListener;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCache;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import com.netflix.curator.framework.recipes.cache.PathChildrenCacheListener;
+import java.io.Closeable;
+
+class InternalSyncSpec implements Closeable, PathChildrenCacheListener, NodeCacheListener
+{
+    private final PathChildrenCache pathCache;
+    private final NodeCache nodeCache;
+    private final CuratorGlobalSync sync;
+
+    InternalSyncSpec(CuratorGlobalSync sync, PathChildrenCache pathCache)
+    {
+        this.sync = sync;
+        this.pathCache = pathCache;
+        nodeCache = null;
+
+        pathCache.getListenable().addListener(this);
+    }
+
+    InternalSyncSpec(CuratorGlobalSync sync, NodeCache nodeCache)
+    {
+        this.sync = sync;
+        pathCache = null;
+        this.nodeCache = nodeCache;
+
+        nodeCache.getListenable().addListener(this);
+    }
+
+    public void start() throws Exception
+    {
+        if ( pathCache != null )
+        {
+            pathCache.start();
+        }
+
+        if ( nodeCache != null )
+        {
+            nodeCache.start();
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        Closeables.closeQuietly(pathCache);
+        Closeables.closeQuietly(nodeCache);
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+    {
+        // TODO what about deletes?
+        sync.postNodeToSync(event.getData());
+    }
+
+    @Override
+    public void nodeChanged() throws Exception
+    {
+        // TODO what about deletes?
+        sync.postNodeToSync(nodeCache.getCurrentData());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
new file mode 100644
index 0000000..83b351d
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/NodeSyncQueue.java
@@ -0,0 +1,103 @@
+package com.netflix.curator.x.sync;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.data.Stat;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class NodeSyncQueue implements Closeable
+{
+    private final CuratorGlobalSync sync;
+    private final ExecutorService service;
+    private final Collection<CuratorFramework> remoteClients;
+    private final BlockingQueue<ChildDataHolder> queue = new LinkedBlockingQueue<ChildDataHolder>();    // TODO capacity
+
+    private static class ChildDataHolder
+    {
+        private final ChildData data;
+
+        private ChildDataHolder(ChildData data)
+        {
+            this.data = data;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if ( this == o )
+            {
+                return true;
+            }
+            if ( o == null || getClass() != o.getClass() )
+            {
+                return false;
+            }
+
+            ChildDataHolder that = (ChildDataHolder)o;
+            return data.getPath().equals(that.data.getPath());
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return data.getPath().hashCode();
+        }
+    }
+
+    NodeSyncQueue(CuratorGlobalSync sync, ExecutorService service, Collection<CuratorFramework> remoteClients)
+    {
+        this.sync = sync;
+        this.service = service;
+        this.remoteClients = ImmutableList.copyOf(remoteClients);
+    }
+
+    void start()
+    {
+        service.submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        while ( !Thread.currentThread().isInterrupted() )
+                        {
+                            process(queue.take());
+                        }
+                    }
+                    catch ( InterruptedException dummy )
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        );
+    }
+
+    private void process(ChildDataHolder childDataHolder)
+    {
+        for ( final CuratorFramework remote : remoteClients )
+        {
+            service.submit(new RemoteSync(sync, remote, childDataHolder.data));
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        service.shutdownNow();
+    }
+
+    synchronized void add(ChildData nodeData)
+    {
+        // TODO what about deletes?
+        queue.add(new ChildDataHolder(nodeData));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
new file mode 100644
index 0000000..304df96
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/RemoteSync.java
@@ -0,0 +1,37 @@
+package com.netflix.curator.x.sync;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.data.Stat;
+
+class RemoteSync implements Runnable
+{
+    private final CuratorGlobalSync sync;
+    private final CuratorFramework remoteClient;
+    private final ChildData data;
+
+    RemoteSync(CuratorGlobalSync sync, CuratorFramework remoteClient, ChildData data)
+    {
+        this.sync = sync;
+        this.remoteClient = remoteClient;
+        this.data = data;
+    }
+
+    @Override
+    public void run()
+    {
+        try
+        {
+            // TODO - is version enough? should I use timestamp instead?
+            Stat stat = remoteClient.checkExists().forPath(data.getPath());
+            if ( stat.getMtime() < data.getStat().getMtime() )
+            {
+                remoteClient.setData().withVersion(stat.getVersion()).forPath(data.getPath(), data.getData());
+            }
+        }
+        catch ( Exception e )
+        {
+            // TODO - no node? bad version? connection issues?
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
new file mode 100644
index 0000000..08693a6
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncSpec.java
@@ -0,0 +1,23 @@
+package com.netflix.curator.x.sync;
+
+public class SyncSpec
+{
+    private final String path;
+    private final SyncTypes type;
+
+    public SyncSpec(String path, SyncTypes type)
+    {
+        this.path = path;
+        this.type = type;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public SyncTypes getType()
+    {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
----------------------------------------------------------------------
diff --git a/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
new file mode 100644
index 0000000..5a27b83
--- /dev/null
+++ b/curator-x-global-sync/src/main/java/com/netflix/curator/x/sync/SyncTypes.java
@@ -0,0 +1,9 @@
+package com.netflix.curator.x.sync;
+
+public enum SyncTypes
+{
+    SINGLE_NODE,
+    SINGLE_NODE_COMPRESSED_DATA,
+    CHILDREN_OF_NODE,
+    CHILDREN_OF_NODE_COMPRESSED_DATA
+}

http://git-wip-us.apache.org/repos/asf/incubator-curator/blob/28912f7d/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 4466dc7..b31488e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1,2 @@
 rootProject.name='curator'
-include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server','curator-x-zkclient-bridge','curator-examples'
+include 'curator-client','curator-framework','curator-recipes','curator-test','curator-x-discovery','curator-x-discovery-server','curator-x-zkclient-bridge','curator-examples','curator-x-global-sync'