You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/05/29 03:35:11 UTC

[1/2] git commit: wip

Repository: curator
Updated Branches:
  refs/heads/curator-rpc 598ad996c -> 530010d90


wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e5ecbcd9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e5ecbcd9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e5ecbcd9

Branch: refs/heads/curator-rpc
Commit: e5ecbcd96fd84611a11a34074d63ae05a77df0d9
Parents: 598ad99
Author: randgalt <ra...@apache.org>
Authored: Wed May 28 13:11:49 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed May 28 13:11:49 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/x/rpc/Closer.java   |   6 -
 .../org/apache/curator/x/rpc/CuratorEntry.java  | 127 -------------------
 .../curator/x/rpc/CuratorProjectionServer.java  |  12 +-
 .../org/apache/curator/x/rpc/RpcManager.java    |  66 ----------
 .../x/rpc/configuration/Configuration.java      |   3 +-
 .../configuration/ConnectionConfiguration.java  |  27 +++-
 .../curator/x/rpc/connections/Closer.java       |   6 +
 .../x/rpc/connections/ConnectionManager.java    | 115 +++++++++++++++++
 .../curator/x/rpc/connections/CuratorEntry.java | 127 +++++++++++++++++++
 .../curator/x/rpc/idl/event/EventService.java   |  12 +-
 .../projection/CuratorProjectionService.java    |  20 +--
 11 files changed, 299 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
deleted file mode 100644
index a389776..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.curator.x.rpc;
-
-public interface Closer<T>
-{
-    public void close(T thing);
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
deleted file mode 100644
index 52f025d..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class CuratorEntry implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
-    private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
-    private final Map<String, Entry> things = Maps.newConcurrentMap();
-
-    private static class Entry
-    {
-        final Object thing;
-        final Closer closer;
-
-        private Entry(Object thing, Closer closer)
-        {
-            this.thing = thing;
-            this.closer = closer;
-        }
-    }
-
-    private enum State
-    {
-        OPEN,
-        CLOSED
-    }
-
-    public CuratorEntry(CuratorFramework client)
-    {
-        this.client = client;
-    }
-
-    @Override
-    public void close()
-    {
-        if ( state.compareAndSet(State.OPEN, State.CLOSED) )
-        {
-            for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
-            {
-                Entry entry = mapEntry.getValue();
-                if ( entry.closer != null )
-                {
-                    log.debug(String.format("Closing left over thing. Type: %s - Id: %s", entry.thing.getClass(), mapEntry.getKey()));
-                    //noinspection unchecked
-                    entry.closer.close(entry.thing);    // lack of generics is safe because addThing() is type-safe
-                }
-            }
-            things.clear();
-
-            client.close();
-            events.clear();
-        }
-    }
-
-    public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
-    {
-        if ( state.get() == State.OPEN )
-        {
-            return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
-        }
-        return null;
-    }
-
-    public void addEvent(RpcCuratorEvent event)
-    {
-        if ( state.get() == State.OPEN )
-        {
-            events.offer(event);
-        }
-    }
-
-    public CuratorFramework getClient()
-    {
-        return (state.get() == State.OPEN) ? client : null;
-    }
-
-    public <T> String addThing(T thing, Closer<T> closer)
-    {
-        return addThing(UUID.randomUUID().toString(), thing, closer);
-    }
-
-    public <T> String addThing(String id, T thing, Closer<T> closer)
-    {
-        things.put(id, new Entry(thing, closer));
-        return id;
-    }
-
-    public <T> T getThing(String id, Class<T> clazz)
-    {
-        Entry entry = (id != null) ? things.get(id) : null;
-        return cast(clazz, entry);
-    }
-
-    public boolean closeThing(String id)
-    {
-        Entry entry = (id != null) ? things.remove(id) : null;
-        if ( entry != null )
-        {
-            //noinspection unchecked
-            entry.closer.close(entry.thing);
-        }
-        return false;
-    }
-
-    private <T> T cast(Class<T> clazz, Entry entry)
-    {
-        if ( entry != null )
-        {
-            return clazz.cast(entry.thing);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 2dcb99e..e8ebfc5 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -29,6 +29,7 @@ import com.google.common.io.Files;
 import com.google.common.io.Resources;
 import org.apache.curator.x.rpc.configuration.Configuration;
 import org.apache.curator.x.rpc.configuration.ConfigurationBuilder;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.event.EventService;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService;
 import org.slf4j.Logger;
@@ -42,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class CuratorProjectionServer
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
     private final ThriftServer server;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final Configuration configuration;
@@ -94,9 +95,9 @@ public class CuratorProjectionServer
     public CuratorProjectionServer(Configuration configuration)
     {
         this.configuration = configuration;
-        rpcManager = new RpcManager(configuration.getProjectionExpiration().toMillis());
-        EventService eventService = new EventService(rpcManager, configuration.getPingTime().toMillis());
-        CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager);
+        connectionManager = new ConnectionManager(configuration.getConnections(), configuration.getProjectionExpiration().toMillis());
+        EventService eventService = new EventService(connectionManager, configuration.getPingTime().toMillis());
+        CuratorProjectionService projectionService = new CuratorProjectionService(connectionManager);
         ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
         server = new ThriftServer(processor, configuration.getThrift());
     }
@@ -106,6 +107,7 @@ public class CuratorProjectionServer
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
 
         configuration.getLogging().configure(new MetricRegistry(), "curator-rpc");
+        connectionManager.start();
         server.start();
 
         log.info("Server listening on port: " + configuration.getThrift().getPort());
@@ -117,8 +119,8 @@ public class CuratorProjectionServer
         {
             log.info("Stopping...");
 
-            rpcManager.close();
             server.close();
+            connectionManager.close();
             configuration.getLogging().stop();
 
             log.info("Stopped");

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
deleted file mode 100644
index e238aa6..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-
-public class RpcManager implements Closeable
-{
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final Cache<String, CuratorEntry> cache;
-
-    public RpcManager(long expirationMs)
-    {
-        RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
-        {
-            @SuppressWarnings("NullableProblems")
-            @Override
-            public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
-            {
-                if ( notification != null )
-                {
-                    log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
-
-                    CuratorEntry entry = notification.getValue();
-                    if ( entry != null )
-                    {
-                        entry.close();
-                    }
-                }
-            }
-        };
-        cache = CacheBuilder
-            .newBuilder()
-            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
-            .removalListener(listener)
-            .build();
-    }
-
-    @Override
-    public void close()
-    {
-        cache.invalidateAll();
-        cache.cleanUp();
-    }
-
-    public void add(String id, CuratorFramework client)
-    {
-        cache.put(id, new CuratorEntry(client));
-    }
-
-    public CuratorEntry get(String id)
-    {
-        return cache.getIfPresent(id);
-    }
-
-    public CuratorEntry remove(String id)
-    {
-        return cache.asMap().remove(id);
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
index e567cac..6d567b7 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
@@ -1,6 +1,7 @@
 package org.apache.curator.x.rpc.configuration;
 
 import com.facebook.swift.service.ThriftServerConfig;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.airlift.units.Duration;
 import io.dropwizard.logging.LoggingFactory;
@@ -57,7 +58,7 @@ public class Configuration
 
     public List<ConnectionConfiguration> getConnections()
     {
-        return connections;
+        return ImmutableList.copyOf(connections);
     }
 
     public void setConnections(List<ConnectionConfiguration> connections)

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
index 3e2f9a2..51f12fa 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
@@ -1,13 +1,16 @@
 package org.apache.curator.x.rpc.configuration;
 
+import com.google.common.base.Preconditions;
 import io.airlift.units.Duration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import javax.validation.constraints.NotNull;
 import java.util.concurrent.TimeUnit;
 
 public class ConnectionConfiguration
 {
     @NotNull private String name;
-    private String connectionString = "localhost:2181";
+    private String connectionString = null;
     private Duration sessionLength = new Duration(1, TimeUnit.MINUTES);
     private Duration connectionTimeout = new Duration(15, TimeUnit.SECONDS);
     private AuthorizationConfiguration authorization = null;
@@ -83,4 +86,26 @@ public class ConnectionConfiguration
     {
         this.retry = retry;
     }
+
+    public CuratorFramework build()
+    {
+        Preconditions.checkState((connectionString != null) && (connectionString.length() > 0), "You must specify a connection string for connection: " + name);
+        Preconditions.checkNotNull(retry, "retry cannot be null");
+
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+        builder = builder
+            .connectString(connectionString)
+            .sessionTimeoutMs((int)sessionLength.toMillis())
+            .connectionTimeoutMs((int)connectionTimeout.toMillis())
+            .retryPolicy(retry.build());
+        if ( authorization != null )
+        {
+            builder = builder.authorization(authorization.getScheme(), authorization.getAuth().getBytes());
+        }
+        if ( namespace != null )
+        {
+            builder = builder.namespace(namespace);
+        }
+        return builder.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
new file mode 100644
index 0000000..060c7d7
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
@@ -0,0 +1,6 @@
+package org.apache.curator.x.rpc.connections;
+
+public interface Closer<T>
+{
+    public void close(T thing);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
new file mode 100644
index 0000000..b101c6d
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -0,0 +1,115 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.configuration.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConnectionManager implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Cache<String, CuratorEntry> cache;
+    private final Map<String, CuratorFramework> clients;
+    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+    private enum State
+    {
+        LATENT,
+        STARTED,
+        CLOSED
+    }
+
+    public ConnectionManager(List<ConnectionConfiguration> connections, long expirationMs)
+    {
+        RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
+        {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
+            {
+                if ( notification != null )
+                {
+                    log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
+
+                    CuratorEntry entry = notification.getValue();
+                    if ( entry != null )
+                    {
+                        entry.close();
+                    }
+                }
+            }
+        };
+        cache = CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
+            .removalListener(listener)
+            .build();
+
+        clients = buildClients(connections);
+    }
+
+    public void start()
+    {
+        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+        for ( CuratorFramework client : clients.values() )
+        {
+            client.start();
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+        {
+            cache.invalidateAll();
+            cache.cleanUp();
+
+            for ( CuratorFramework client : clients.values() )
+            {
+                client.close();
+            }
+        }
+    }
+
+    public void add(String id, CuratorFramework client)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        cache.put(id, new CuratorEntry(client));
+    }
+
+    public CuratorEntry get(String id)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        return cache.getIfPresent(id);
+    }
+
+    public CuratorEntry remove(String id)
+    {
+        Preconditions.checkState(state.get() == State.STARTED, "Not started");
+        return cache.asMap().remove(id);
+    }
+
+    private Map<String, CuratorFramework> buildClients(List<ConnectionConfiguration> connections)
+    {
+        Preconditions.checkArgument(connections.size() > 0, "You must have at least one connection configured");
+
+        ImmutableMap.Builder<String, CuratorFramework> builder = ImmutableMap.builder();
+        for ( ConnectionConfiguration configuration : connections )
+        {
+            builder.put(configuration.getName(), configuration.build());
+        }
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
new file mode 100644
index 0000000..0c4bb8b
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
@@ -0,0 +1,127 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CuratorEntry implements Closeable
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFramework client;
+    private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
+    private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
+    private final Map<String, Entry> things = Maps.newConcurrentMap();
+
+    private static class Entry
+    {
+        final Object thing;
+        final Closer closer;
+
+        private Entry(Object thing, Closer closer)
+        {
+            this.thing = thing;
+            this.closer = closer;
+        }
+    }
+
+    private enum State
+    {
+        OPEN,
+        CLOSED
+    }
+
+    public CuratorEntry(CuratorFramework client)
+    {
+        this.client = client;
+    }
+
+    @Override
+    public void close()
+    {
+        if ( state.compareAndSet(State.OPEN, State.CLOSED) )
+        {
+            for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
+            {
+                Entry entry = mapEntry.getValue();
+                if ( entry.closer != null )
+                {
+                    log.debug(String.format("Closing left over thing. Type: %s - Id: %s", entry.thing.getClass(), mapEntry.getKey()));
+                    //noinspection unchecked
+                    entry.closer.close(entry.thing);    // lack of generics is safe because addThing() is type-safe
+                }
+            }
+            things.clear();
+
+            client.close();
+            events.clear();
+        }
+    }
+
+    public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
+    {
+        if ( state.get() == State.OPEN )
+        {
+            return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+        }
+        return null;
+    }
+
+    public void addEvent(RpcCuratorEvent event)
+    {
+        if ( state.get() == State.OPEN )
+        {
+            events.offer(event);
+        }
+    }
+
+    public CuratorFramework getClient()
+    {
+        return (state.get() == State.OPEN) ? client : null;
+    }
+
+    public <T> String addThing(T thing, Closer<T> closer)
+    {
+        return addThing(UUID.randomUUID().toString(), thing, closer);
+    }
+
+    public <T> String addThing(String id, T thing, Closer<T> closer)
+    {
+        things.put(id, new Entry(thing, closer));
+        return id;
+    }
+
+    public <T> T getThing(String id, Class<T> clazz)
+    {
+        Entry entry = (id != null) ? things.get(id) : null;
+        return cast(clazz, entry);
+    }
+
+    public boolean closeThing(String id)
+    {
+        Entry entry = (id != null) ? things.remove(id) : null;
+        if ( entry != null )
+        {
+            //noinspection unchecked
+            entry.closer.close(entry.thing);
+        }
+        return false;
+    }
+
+    private <T> T cast(Class<T> clazz, Entry entry)
+    {
+        if ( entry != null )
+        {
+            return clazz.cast(entry.thing);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
index 13598c8..933f3af 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
@@ -2,26 +2,26 @@ package org.apache.curator.x.rpc.idl.event;
 
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
 
 @ThriftService("EventService")
 public class EventService
 {
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
     private final long pingTimeMs;
 
-    public EventService(RpcManager rpcManager, long pingTimeMs)
+    public EventService(ConnectionManager connectionManager, long pingTimeMs)
     {
-        this.rpcManager = rpcManager;
+        this.connectionManager = connectionManager;
         this.pingTimeMs = pingTimeMs;
     }
 
     @ThriftMethod
     public RpcCuratorEvent getNextEvent(CuratorProjection projection) throws InterruptedException
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry == null )
         {
             // TODO

http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 708dc32..b76730d 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -34,9 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.x.rpc.Closer;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.Closer;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
 import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,11 +47,11 @@ import java.util.concurrent.TimeUnit;
 public class CuratorProjectionService
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final RpcManager rpcManager;
+    private final ConnectionManager connectionManager;
 
-    public CuratorProjectionService(RpcManager rpcManager)
+    public CuratorProjectionService(ConnectionManager connectionManager)
     {
-        this.rpcManager = rpcManager;
+        this.connectionManager = connectionManager;
     }
 
     @ThriftMethod
@@ -60,7 +60,7 @@ public class CuratorProjectionService
         CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
         String id = UUID.randomUUID().toString();
         client.start();
-        rpcManager.add(id, client);
+        connectionManager.add(id, client);
         final CuratorProjection projection = new CuratorProjection(id);
 
         ConnectionStateListener listener = new ConnectionStateListener()
@@ -79,7 +79,7 @@ public class CuratorProjectionService
     @ThriftMethod
     public void closeCuratorProjection(CuratorProjection projection)
     {
-        CuratorEntry entry = rpcManager.remove(projection.id);
+        CuratorEntry entry = connectionManager.remove(projection.id);
         if ( entry != null )
         {
             entry.close();
@@ -166,7 +166,7 @@ public class CuratorProjectionService
 
     private void addEvent(CuratorProjection projection, RpcCuratorEvent event)
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry != null )
         {
             entry.addEvent(event);
@@ -202,7 +202,7 @@ public class CuratorProjectionService
 
     private CuratorEntry getEntry(CuratorProjection projection) throws Exception
     {
-        CuratorEntry entry = rpcManager.get(projection.id);
+        CuratorEntry entry = connectionManager.get(projection.id);
         if ( entry == null )
         {
             throw new Exception("No client found with id: " + projection.id);


[2/2] git commit: Completed move to new configuration. Connection specs are now fully specified in the server via configuration. Clients create projections by name only.

Posted by ra...@apache.org.
Completed move to new configuration. Connection specs are now fully specified in the server via configuration. Clients create projections by name only.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/530010d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/530010d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/530010d9

Branch: refs/heads/curator-rpc
Commit: 530010d902cb8a2ff09ffe66048e9f11a0650cb1
Parents: e5ecbcd
Author: randgalt <ra...@apache.org>
Authored: Wed May 28 20:35:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed May 28 20:35:03 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rpc/CuratorProjectionServer.java  |   2 +-
 .../x/rpc/connections/ConnectionManager.java    |  48 ++--
 .../projection/CuratorProjectionService.java    |   9 +-
 .../idl/projection/CuratorProjectionSpec.java   |  26 --
 curator-x-rpc/src/main/thrift/curator.thrift    |   5 +-
 .../generated/CuratorProjectionSpec.java        | 280 -------------------
 .../curator/generated/CuratorService.java       | 139 +++++----
 .../org/apache/curator/x/rpc/TestClient.java    |   3 +-
 .../org/apache/curator/x/rpc/TestServer.java    |  13 +
 .../src/test/resources/configuration/test.json  |  14 +
 10 files changed, 130 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index e8ebfc5..0ab490b 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -71,7 +71,7 @@ public class CuratorProjectionServer
         }
         else
         {
-            System.out.println("First argument is not a file. Treating the command line as a json object");
+            System.out.println("First argument is not a file. Treating the command line as a json/yaml object");
             configurationSource = args[0];
         }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
index b101c6d..21fe762 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -7,12 +7,14 @@ import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.x.rpc.configuration.ConnectionConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -20,8 +22,11 @@ public class ConnectionManager implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Cache<String, CuratorEntry> cache;
-    private final Map<String, CuratorFramework> clients;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+    private final Map<String, ConnectionConfiguration> connections;
+    private final ScheduledExecutorService service = ThreadUtils.newSingleThreadScheduledExecutor("ConnectionManager");
+
+    private static final int FORCED_CLEANUP_SECONDS = 30;
 
     private enum State
     {
@@ -32,6 +37,8 @@ public class ConnectionManager implements Closeable
 
     public ConnectionManager(List<ConnectionConfiguration> connections, long expirationMs)
     {
+        this.connections = buildConnectionsMap(connections);
+
         RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
         {
             @SuppressWarnings("NullableProblems")
@@ -50,22 +57,22 @@ public class ConnectionManager implements Closeable
                 }
             }
         };
-        cache = CacheBuilder
-            .newBuilder()
-            .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
-            .removalListener(listener)
-            .build();
-
-        clients = buildClients(connections);
+        cache = CacheBuilder.newBuilder().expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS).removalListener(listener).build();
     }
 
     public void start()
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
-        for ( CuratorFramework client : clients.values() )
+
+        Runnable cleanup = new Runnable()
         {
-            client.start();
-        }
+            @Override
+            public void run()
+            {
+                cache.cleanUp();
+            }
+        };
+        service.scheduleWithFixedDelay(cleanup, FORCED_CLEANUP_SECONDS, 30, TimeUnit.SECONDS);
     }
 
     @Override
@@ -73,16 +80,19 @@ public class ConnectionManager implements Closeable
     {
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
+            service.shutdownNow();
             cache.invalidateAll();
             cache.cleanUp();
-
-            for ( CuratorFramework client : clients.values() )
-            {
-                client.close();
-            }
         }
     }
 
+    public CuratorFramework newConnection(String name)
+    {
+        ConnectionConfiguration configuration = connections.get(name);
+        Preconditions.checkNotNull(configuration, "No connection configuration with that name was found: " + name);
+        return configuration.build();
+    }
+
     public void add(String id, CuratorFramework client)
     {
         Preconditions.checkState(state.get() == State.STARTED, "Not started");
@@ -101,14 +111,14 @@ public class ConnectionManager implements Closeable
         return cache.asMap().remove(id);
     }
 
-    private Map<String, CuratorFramework> buildClients(List<ConnectionConfiguration> connections)
+    private Map<String, ConnectionConfiguration> buildConnectionsMap(List<ConnectionConfiguration> connections)
     {
         Preconditions.checkArgument(connections.size() > 0, "You must have at least one connection configured");
 
-        ImmutableMap.Builder<String, CuratorFramework> builder = ImmutableMap.builder();
+        ImmutableMap.Builder<String, ConnectionConfiguration> builder = ImmutableMap.builder();
         for ( ConnectionConfiguration configuration : connections )
         {
-            builder.put(configuration.getName(), configuration.build());
+            builder.put(configuration.getName(), configuration);
         }
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index b76730d..880f290 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -22,7 +22,6 @@ package org.apache.curator.x.rpc.idl.projection;
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.Backgroundable;
 import org.apache.curator.framework.api.Compressible;
@@ -33,10 +32,9 @@ import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.x.rpc.connections.Closer;
-import org.apache.curator.x.rpc.connections.CuratorEntry;
 import org.apache.curator.x.rpc.connections.ConnectionManager;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
 import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,9 +53,10 @@ public class CuratorProjectionService
     }
 
     @ThriftMethod
-    public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec)   // TODO
+    public CuratorProjection newCuratorProjection(String connectionName)
     {
-        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
+        CuratorFramework client = connectionManager.newConnection(connectionName);
+
         String id = UUID.randomUUID().toString();
         client.start();
         connectionManager.add(id, client);

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
deleted file mode 100644
index b79990c..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.rpc.idl.projection;
-
-import com.facebook.swift.codec.ThriftStruct;
-
-@ThriftStruct
-public class CuratorProjectionSpec
-{
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index b03cc66..491753b 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -33,9 +33,6 @@ struct CuratorProjection {
   1: string id;
 }
 
-struct CuratorProjectionSpec {
-}
-
 struct GenericProjection {
   1: string id;
 }
@@ -88,7 +85,7 @@ service CuratorService {
   void closeCuratorProjection(1: CuratorProjection projection);
   bool closeGenericProjection(1: CuratorProjection curatorProjection, 2: GenericProjection genericProjection);
   string create(1: CuratorProjection projection, 2: CreateSpec createSpec);
-  CuratorProjection newCuratorProjection(1: CuratorProjectionSpec spec);
+  CuratorProjection newCuratorProjection(1: string connectionName);
 }
 
 service EventService {

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
deleted file mode 100644
index 1171cc9..0000000
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.curator.generated;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CuratorProjectionSpec implements org.apache.thrift.TBase<CuratorProjectionSpec, CuratorProjectionSpec._Fields>, java.io.Serializable, Cloneable, Comparable<CuratorProjectionSpec> {
-  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CuratorProjectionSpec");
-
-
-  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
-  static {
-    schemes.put(StandardScheme.class, new CuratorProjectionSpecStandardSchemeFactory());
-    schemes.put(TupleScheme.class, new CuratorProjectionSpecTupleSchemeFactory());
-  }
-
-
-  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-;
-
-    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
-    static {
-      for (_Fields field : EnumSet.allOf(_Fields.class)) {
-        byName.put(field.getFieldName(), field);
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, or null if its not found.
-     */
-    public static _Fields findByThriftId(int fieldId) {
-      switch(fieldId) {
-        default:
-          return null;
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, throwing an exception
-     * if it is not found.
-     */
-    public static _Fields findByThriftIdOrThrow(int fieldId) {
-      _Fields fields = findByThriftId(fieldId);
-      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-      return fields;
-    }
-
-    /**
-     * Find the _Fields constant that matches name, or null if its not found.
-     */
-    public static _Fields findByName(String name) {
-      return byName.get(name);
-    }
-
-    private final short _thriftId;
-    private final String _fieldName;
-
-    _Fields(short thriftId, String fieldName) {
-      _thriftId = thriftId;
-      _fieldName = fieldName;
-    }
-
-    public short getThriftFieldId() {
-      return _thriftId;
-    }
-
-    public String getFieldName() {
-      return _fieldName;
-    }
-  }
-  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
-  static {
-    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-    metaDataMap = Collections.unmodifiableMap(tmpMap);
-    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CuratorProjectionSpec.class, metaDataMap);
-  }
-
-  public CuratorProjectionSpec() {
-  }
-
-  /**
-   * Performs a deep copy on <i>other</i>.
-   */
-  public CuratorProjectionSpec(CuratorProjectionSpec other) {
-  }
-
-  public CuratorProjectionSpec deepCopy() {
-    return new CuratorProjectionSpec(this);
-  }
-
-  @Override
-  public void clear() {
-  }
-
-  public void setFieldValue(_Fields field, Object value) {
-    switch (field) {
-    }
-  }
-
-  public Object getFieldValue(_Fields field) {
-    switch (field) {
-    }
-    throw new IllegalStateException();
-  }
-
-  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-  public boolean isSet(_Fields field) {
-    if (field == null) {
-      throw new IllegalArgumentException();
-    }
-
-    switch (field) {
-    }
-    throw new IllegalStateException();
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that == null)
-      return false;
-    if (that instanceof CuratorProjectionSpec)
-      return this.equals((CuratorProjectionSpec)that);
-    return false;
-  }
-
-  public boolean equals(CuratorProjectionSpec that) {
-    if (that == null)
-      return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return 0;
-  }
-
-  @Override
-  public int compareTo(CuratorProjectionSpec other) {
-    if (!getClass().equals(other.getClass())) {
-      return getClass().getName().compareTo(other.getClass().getName());
-    }
-
-    int lastComparison = 0;
-
-    return 0;
-  }
-
-  public _Fields fieldForId(int fieldId) {
-    return _Fields.findByThriftId(fieldId);
-  }
-
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
-  }
-
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("CuratorProjectionSpec(");
-    boolean first = true;
-
-    sb.append(")");
-    return sb.toString();
-  }
-
-  public void validate() throws org.apache.thrift.TException {
-    // check for required fields
-    // check for sub-struct validity
-  }
-
-  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-    try {
-      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
-    try {
-      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private static class CuratorProjectionSpecStandardSchemeFactory implements SchemeFactory {
-    public CuratorProjectionSpecStandardScheme getScheme() {
-      return new CuratorProjectionSpecStandardScheme();
-    }
-  }
-
-  private static class CuratorProjectionSpecStandardScheme extends StandardScheme<CuratorProjectionSpec> {
-
-    public void read(org.apache.thrift.protocol.TProtocol iprot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField schemeField;
-      iprot.readStructBegin();
-      while (true)
-      {
-        schemeField = iprot.readFieldBegin();
-        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (schemeField.id) {
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      struct.validate();
-    }
-
-    public void write(org.apache.thrift.protocol.TProtocol oprot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
-      struct.validate();
-
-      oprot.writeStructBegin(STRUCT_DESC);
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
-    }
-
-  }
-
-  private static class CuratorProjectionSpecTupleSchemeFactory implements SchemeFactory {
-    public CuratorProjectionSpecTupleScheme getScheme() {
-      return new CuratorProjectionSpecTupleScheme();
-    }
-  }
-
-  private static class CuratorProjectionSpecTupleScheme extends TupleScheme<CuratorProjectionSpec> {
-
-    @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
-      TTupleProtocol oprot = (TTupleProtocol) prot;
-    }
-
-    @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
-      TTupleProtocol iprot = (TTupleProtocol) prot;
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
index 0fd1905..8eae496 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
@@ -44,7 +44,7 @@ public class CuratorService {
 
     public String create(CuratorProjection projection, CreateSpec createSpec) throws org.apache.thrift.TException;
 
-    public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException;
+    public CuratorProjection newCuratorProjection(String connectionName) throws org.apache.thrift.TException;
 
   }
 
@@ -58,7 +58,7 @@ public class CuratorService {
 
     public void create(CuratorProjection projection, CreateSpec createSpec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
-    public void newCuratorProjection(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+    public void newCuratorProjection(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
   }
 
@@ -175,16 +175,16 @@ public class CuratorService {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "create failed: unknown result");
     }
 
-    public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException
+    public CuratorProjection newCuratorProjection(String connectionName) throws org.apache.thrift.TException
     {
-      send_newCuratorProjection(spec);
+      send_newCuratorProjection(connectionName);
       return recv_newCuratorProjection();
     }
 
-    public void send_newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException
+    public void send_newCuratorProjection(String connectionName) throws org.apache.thrift.TException
     {
       newCuratorProjection_args args = new newCuratorProjection_args();
-      args.setSpec(spec);
+      args.setConnectionName(connectionName);
       sendBase("newCuratorProjection", args);
     }
 
@@ -356,24 +356,24 @@ public class CuratorService {
       }
     }
 
-    public void newCuratorProjection(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+    public void newCuratorProjection(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      newCuratorProjection_call method_call = new newCuratorProjection_call(spec, resultHandler, this, ___protocolFactory, ___transport);
+      newCuratorProjection_call method_call = new newCuratorProjection_call(connectionName, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
 
     public static class newCuratorProjection_call extends org.apache.thrift.async.TAsyncMethodCall {
-      private CuratorProjectionSpec spec;
-      public newCuratorProjection_call(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private String connectionName;
+      public newCuratorProjection_call(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
-        this.spec = spec;
+        this.connectionName = connectionName;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
         prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("newCuratorProjection", org.apache.thrift.protocol.TMessageType.CALL, 0));
         newCuratorProjection_args args = new newCuratorProjection_args();
-        args.setSpec(spec);
+        args.setConnectionName(connectionName);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -505,7 +505,7 @@ public class CuratorService {
 
       public newCuratorProjection_result getResult(I iface, newCuratorProjection_args args) throws org.apache.thrift.TException {
         newCuratorProjection_result result = new newCuratorProjection_result();
-        result.success = iface.newCuratorProjection(args.spec);
+        result.success = iface.newCuratorProjection(args.connectionName);
         return result;
       }
     }
@@ -782,7 +782,7 @@ public class CuratorService {
       }
 
       public void start(I iface, newCuratorProjection_args args, org.apache.thrift.async.AsyncMethodCallback<CuratorProjection> resultHandler) throws TException {
-        iface.newCuratorProjection(args.spec,resultHandler);
+        iface.newCuratorProjection(args.connectionName,resultHandler);
       }
     }
 
@@ -3948,7 +3948,7 @@ public class CuratorService {
   public static class newCuratorProjection_args implements org.apache.thrift.TBase<newCuratorProjection_args, newCuratorProjection_args._Fields>, java.io.Serializable, Cloneable, Comparable<newCuratorProjection_args>   {
     private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("newCuratorProjection_args");
 
-    private static final org.apache.thrift.protocol.TField SPEC_FIELD_DESC = new org.apache.thrift.protocol.TField("spec", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+    private static final org.apache.thrift.protocol.TField CONNECTION_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("connectionName", org.apache.thrift.protocol.TType.STRING, (short)1);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -3956,11 +3956,11 @@ public class CuratorService {
       schemes.put(TupleScheme.class, new newCuratorProjection_argsTupleSchemeFactory());
     }
 
-    public CuratorProjectionSpec spec; // required
+    public String connectionName; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-      SPEC((short)1, "spec");
+      CONNECTION_NAME((short)1, "connectionName");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -3975,8 +3975,8 @@ public class CuratorService {
        */
       public static _Fields findByThriftId(int fieldId) {
         switch(fieldId) {
-          case 1: // SPEC
-            return SPEC;
+          case 1: // CONNECTION_NAME
+            return CONNECTION_NAME;
           default:
             return null;
         }
@@ -4020,8 +4020,8 @@ public class CuratorService {
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
       Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-      tmpMap.put(_Fields.SPEC, new org.apache.thrift.meta_data.FieldMetaData("spec", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CuratorProjectionSpec.class)));
+      tmpMap.put(_Fields.CONNECTION_NAME, new org.apache.thrift.meta_data.FieldMetaData("connectionName", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(newCuratorProjection_args.class, metaDataMap);
     }
@@ -4030,18 +4030,18 @@ public class CuratorService {
     }
 
     public newCuratorProjection_args(
-      CuratorProjectionSpec spec)
+      String connectionName)
     {
       this();
-      this.spec = spec;
+      this.connectionName = connectionName;
     }
 
     /**
      * Performs a deep copy on <i>other</i>.
      */
     public newCuratorProjection_args(newCuratorProjection_args other) {
-      if (other.isSetSpec()) {
-        this.spec = new CuratorProjectionSpec(other.spec);
+      if (other.isSetConnectionName()) {
+        this.connectionName = other.connectionName;
       }
     }
 
@@ -4051,40 +4051,40 @@ public class CuratorService {
 
     @Override
     public void clear() {
-      this.spec = null;
+      this.connectionName = null;
     }
 
-    public CuratorProjectionSpec getSpec() {
-      return this.spec;
+    public String getConnectionName() {
+      return this.connectionName;
     }
 
-    public newCuratorProjection_args setSpec(CuratorProjectionSpec spec) {
-      this.spec = spec;
+    public newCuratorProjection_args setConnectionName(String connectionName) {
+      this.connectionName = connectionName;
       return this;
     }
 
-    public void unsetSpec() {
-      this.spec = null;
+    public void unsetConnectionName() {
+      this.connectionName = null;
     }
 
-    /** Returns true if field spec is set (has been assigned a value) and false otherwise */
-    public boolean isSetSpec() {
-      return this.spec != null;
+    /** Returns true if field connectionName is set (has been assigned a value) and false otherwise */
+    public boolean isSetConnectionName() {
+      return this.connectionName != null;
     }
 
-    public void setSpecIsSet(boolean value) {
+    public void setConnectionNameIsSet(boolean value) {
       if (!value) {
-        this.spec = null;
+        this.connectionName = null;
       }
     }
 
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
-      case SPEC:
+      case CONNECTION_NAME:
         if (value == null) {
-          unsetSpec();
+          unsetConnectionName();
         } else {
-          setSpec((CuratorProjectionSpec)value);
+          setConnectionName((String)value);
         }
         break;
 
@@ -4093,8 +4093,8 @@ public class CuratorService {
 
     public Object getFieldValue(_Fields field) {
       switch (field) {
-      case SPEC:
-        return getSpec();
+      case CONNECTION_NAME:
+        return getConnectionName();
 
       }
       throw new IllegalStateException();
@@ -4107,8 +4107,8 @@ public class CuratorService {
       }
 
       switch (field) {
-      case SPEC:
-        return isSetSpec();
+      case CONNECTION_NAME:
+        return isSetConnectionName();
       }
       throw new IllegalStateException();
     }
@@ -4126,12 +4126,12 @@ public class CuratorService {
       if (that == null)
         return false;
 
-      boolean this_present_spec = true && this.isSetSpec();
-      boolean that_present_spec = true && that.isSetSpec();
-      if (this_present_spec || that_present_spec) {
-        if (!(this_present_spec && that_present_spec))
+      boolean this_present_connectionName = true && this.isSetConnectionName();
+      boolean that_present_connectionName = true && that.isSetConnectionName();
+      if (this_present_connectionName || that_present_connectionName) {
+        if (!(this_present_connectionName && that_present_connectionName))
           return false;
-        if (!this.spec.equals(that.spec))
+        if (!this.connectionName.equals(that.connectionName))
           return false;
       }
 
@@ -4151,12 +4151,12 @@ public class CuratorService {
 
       int lastComparison = 0;
 
-      lastComparison = Boolean.valueOf(isSetSpec()).compareTo(other.isSetSpec());
+      lastComparison = Boolean.valueOf(isSetConnectionName()).compareTo(other.isSetConnectionName());
       if (lastComparison != 0) {
         return lastComparison;
       }
-      if (isSetSpec()) {
-        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.spec, other.spec);
+      if (isSetConnectionName()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.connectionName, other.connectionName);
         if (lastComparison != 0) {
           return lastComparison;
         }
@@ -4181,11 +4181,11 @@ public class CuratorService {
       StringBuilder sb = new StringBuilder("newCuratorProjection_args(");
       boolean first = true;
 
-      sb.append("spec:");
-      if (this.spec == null) {
+      sb.append("connectionName:");
+      if (this.connectionName == null) {
         sb.append("null");
       } else {
-        sb.append(this.spec);
+        sb.append(this.connectionName);
       }
       first = false;
       sb.append(")");
@@ -4195,9 +4195,6 @@ public class CuratorService {
     public void validate() throws org.apache.thrift.TException {
       // check for required fields
       // check for sub-struct validity
-      if (spec != null) {
-        spec.validate();
-      }
     }
 
     private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -4234,11 +4231,10 @@ public class CuratorService {
             break;
           }
           switch (schemeField.id) {
-            case 1: // SPEC
-              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
-                struct.spec = new CuratorProjectionSpec();
-                struct.spec.read(iprot);
-                struct.setSpecIsSet(true);
+            case 1: // CONNECTION_NAME
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.connectionName = iprot.readString();
+                struct.setConnectionNameIsSet(true);
               } else { 
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
@@ -4258,9 +4254,9 @@ public class CuratorService {
         struct.validate();
 
         oprot.writeStructBegin(STRUCT_DESC);
-        if (struct.spec != null) {
-          oprot.writeFieldBegin(SPEC_FIELD_DESC);
-          struct.spec.write(oprot);
+        if (struct.connectionName != null) {
+          oprot.writeFieldBegin(CONNECTION_NAME_FIELD_DESC);
+          oprot.writeString(struct.connectionName);
           oprot.writeFieldEnd();
         }
         oprot.writeFieldStop();
@@ -4281,12 +4277,12 @@ public class CuratorService {
       public void write(org.apache.thrift.protocol.TProtocol prot, newCuratorProjection_args struct) throws org.apache.thrift.TException {
         TTupleProtocol oprot = (TTupleProtocol) prot;
         BitSet optionals = new BitSet();
-        if (struct.isSetSpec()) {
+        if (struct.isSetConnectionName()) {
           optionals.set(0);
         }
         oprot.writeBitSet(optionals, 1);
-        if (struct.isSetSpec()) {
-          struct.spec.write(oprot);
+        if (struct.isSetConnectionName()) {
+          oprot.writeString(struct.connectionName);
         }
       }
 
@@ -4295,9 +4291,8 @@ public class CuratorService {
         TTupleProtocol iprot = (TTupleProtocol) prot;
         BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
-          struct.spec = new CuratorProjectionSpec();
-          struct.spec.read(iprot);
-          struct.setSpecIsSet(true);
+          struct.connectionName = iprot.readString();
+          struct.setConnectionNameIsSet(true);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
index f65138f..b48c066 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
@@ -22,7 +22,6 @@ import org.apache.curator.generated.CreateSpec;
 import org.apache.curator.generated.CuratorEvent;
 import org.apache.curator.generated.CuratorEventType;
 import org.apache.curator.generated.CuratorProjection;
-import org.apache.curator.generated.CuratorProjectionSpec;
 import org.apache.curator.generated.CuratorService;
 import org.apache.curator.generated.EventService;
 import org.apache.curator.generated.GenericProjection;
@@ -49,7 +48,7 @@ public class TestClient
         TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
         final EventService.Client serviceClient = new EventService.Client(eventProtocol);
 
-        final CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+        final CuratorProjection curatorProjection = client.newCuratorProjection("test");
 
         Executors.newSingleThreadExecutor().submit
         (

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
new file mode 100644
index 0000000..9472dd8
--- /dev/null
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
@@ -0,0 +1,13 @@
+package org.apache.curator.x.rpc;
+
+import com.google.common.io.Resources;
+import java.nio.charset.Charset;
+
+public class TestServer
+{
+    public static void main(String[] args) throws Exception
+    {
+        String configurationSource = Resources.toString(Resources.getResource("configuration/test.json"), Charset.defaultCharset());
+        CuratorProjectionServer.main(new String[]{configurationSource});
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/resources/configuration/test.json
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/resources/configuration/test.json b/curator-x-rpc/src/test/resources/configuration/test.json
new file mode 100644
index 0000000..c9d6cb8
--- /dev/null
+++ b/curator-x-rpc/src/test/resources/configuration/test.json
@@ -0,0 +1,14 @@
+{
+    "projectionExpiration": "15s",
+
+    "thrift": {
+        "port": 8899
+    },
+
+    "connections": [
+        {
+            "name": "test",
+            "connectionString": "localhost:2181"
+        }
+    ]
+}
\ No newline at end of file