You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/05/29 03:35:11 UTC
[1/2] git commit: wip
Repository: curator
Updated Branches:
refs/heads/curator-rpc 598ad996c -> 530010d90
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e5ecbcd9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e5ecbcd9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e5ecbcd9
Branch: refs/heads/curator-rpc
Commit: e5ecbcd96fd84611a11a34074d63ae05a77df0d9
Parents: 598ad99
Author: randgalt <ra...@apache.org>
Authored: Wed May 28 13:11:49 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed May 28 13:11:49 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/curator/x/rpc/Closer.java | 6 -
.../org/apache/curator/x/rpc/CuratorEntry.java | 127 -------------------
.../curator/x/rpc/CuratorProjectionServer.java | 12 +-
.../org/apache/curator/x/rpc/RpcManager.java | 66 ----------
.../x/rpc/configuration/Configuration.java | 3 +-
.../configuration/ConnectionConfiguration.java | 27 +++-
.../curator/x/rpc/connections/Closer.java | 6 +
.../x/rpc/connections/ConnectionManager.java | 115 +++++++++++++++++
.../curator/x/rpc/connections/CuratorEntry.java | 127 +++++++++++++++++++
.../curator/x/rpc/idl/event/EventService.java | 12 +-
.../projection/CuratorProjectionService.java | 20 +--
11 files changed, 299 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
deleted file mode 100644
index a389776..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/Closer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.curator.x.rpc;
-
-public interface Closer<T>
-{
- public void close(T thing);
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
deleted file mode 100644
index 52f025d..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorEntry.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class CuratorEntry implements Closeable
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
- private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
- private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
- private final Map<String, Entry> things = Maps.newConcurrentMap();
-
- private static class Entry
- {
- final Object thing;
- final Closer closer;
-
- private Entry(Object thing, Closer closer)
- {
- this.thing = thing;
- this.closer = closer;
- }
- }
-
- private enum State
- {
- OPEN,
- CLOSED
- }
-
- public CuratorEntry(CuratorFramework client)
- {
- this.client = client;
- }
-
- @Override
- public void close()
- {
- if ( state.compareAndSet(State.OPEN, State.CLOSED) )
- {
- for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
- {
- Entry entry = mapEntry.getValue();
- if ( entry.closer != null )
- {
- log.debug(String.format("Closing left over thing. Type: %s - Id: %s", entry.thing.getClass(), mapEntry.getKey()));
- //noinspection unchecked
- entry.closer.close(entry.thing); // lack of generics is safe because addThing() is type-safe
- }
- }
- things.clear();
-
- client.close();
- events.clear();
- }
- }
-
- public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
- {
- if ( state.get() == State.OPEN )
- {
- return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
- }
- return null;
- }
-
- public void addEvent(RpcCuratorEvent event)
- {
- if ( state.get() == State.OPEN )
- {
- events.offer(event);
- }
- }
-
- public CuratorFramework getClient()
- {
- return (state.get() == State.OPEN) ? client : null;
- }
-
- public <T> String addThing(T thing, Closer<T> closer)
- {
- return addThing(UUID.randomUUID().toString(), thing, closer);
- }
-
- public <T> String addThing(String id, T thing, Closer<T> closer)
- {
- things.put(id, new Entry(thing, closer));
- return id;
- }
-
- public <T> T getThing(String id, Class<T> clazz)
- {
- Entry entry = (id != null) ? things.get(id) : null;
- return cast(clazz, entry);
- }
-
- public boolean closeThing(String id)
- {
- Entry entry = (id != null) ? things.remove(id) : null;
- if ( entry != null )
- {
- //noinspection unchecked
- entry.closer.close(entry.thing);
- }
- return false;
- }
-
- private <T> T cast(Class<T> clazz, Entry entry)
- {
- if ( entry != null )
- {
- return clazz.cast(entry.thing);
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 2dcb99e..e8ebfc5 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -29,6 +29,7 @@ import com.google.common.io.Files;
import com.google.common.io.Resources;
import org.apache.curator.x.rpc.configuration.Configuration;
import org.apache.curator.x.rpc.configuration.ConfigurationBuilder;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
import org.apache.curator.x.rpc.idl.event.EventService;
import org.apache.curator.x.rpc.idl.projection.CuratorProjectionService;
import org.slf4j.Logger;
@@ -42,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class CuratorProjectionServer
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final RpcManager rpcManager;
+ private final ConnectionManager connectionManager;
private final ThriftServer server;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final Configuration configuration;
@@ -94,9 +95,9 @@ public class CuratorProjectionServer
public CuratorProjectionServer(Configuration configuration)
{
this.configuration = configuration;
- rpcManager = new RpcManager(configuration.getProjectionExpiration().toMillis());
- EventService eventService = new EventService(rpcManager, configuration.getPingTime().toMillis());
- CuratorProjectionService projectionService = new CuratorProjectionService(rpcManager);
+ connectionManager = new ConnectionManager(configuration.getConnections(), configuration.getProjectionExpiration().toMillis());
+ EventService eventService = new EventService(connectionManager, configuration.getPingTime().toMillis());
+ CuratorProjectionService projectionService = new CuratorProjectionService(connectionManager);
ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), Lists.<ThriftEventHandler>newArrayList(), projectionService, eventService);
server = new ThriftServer(processor, configuration.getThrift());
}
@@ -106,6 +107,7 @@ public class CuratorProjectionServer
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
configuration.getLogging().configure(new MetricRegistry(), "curator-rpc");
+ connectionManager.start();
server.start();
log.info("Server listening on port: " + configuration.getThrift().getPort());
@@ -117,8 +119,8 @@ public class CuratorProjectionServer
{
log.info("Stopping...");
- rpcManager.close();
server.close();
+ connectionManager.close();
configuration.getLogging().stop();
log.info("Stopped");
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
deleted file mode 100644
index e238aa6..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/RpcManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.curator.x.rpc;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-
-public class RpcManager implements Closeable
-{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final Cache<String, CuratorEntry> cache;
-
- public RpcManager(long expirationMs)
- {
- RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
- {
- @SuppressWarnings("NullableProblems")
- @Override
- public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
- {
- if ( notification != null )
- {
- log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
-
- CuratorEntry entry = notification.getValue();
- if ( entry != null )
- {
- entry.close();
- }
- }
- }
- };
- cache = CacheBuilder
- .newBuilder()
- .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
- .removalListener(listener)
- .build();
- }
-
- @Override
- public void close()
- {
- cache.invalidateAll();
- cache.cleanUp();
- }
-
- public void add(String id, CuratorFramework client)
- {
- cache.put(id, new CuratorEntry(client));
- }
-
- public CuratorEntry get(String id)
- {
- return cache.getIfPresent(id);
- }
-
- public CuratorEntry remove(String id)
- {
- return cache.asMap().remove(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
index e567cac..6d567b7 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/Configuration.java
@@ -1,6 +1,7 @@
package org.apache.curator.x.rpc.configuration;
import com.facebook.swift.service.ThriftServerConfig;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.airlift.units.Duration;
import io.dropwizard.logging.LoggingFactory;
@@ -57,7 +58,7 @@ public class Configuration
public List<ConnectionConfiguration> getConnections()
{
- return connections;
+ return ImmutableList.copyOf(connections);
}
public void setConnections(List<ConnectionConfiguration> connections)
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
index 3e2f9a2..51f12fa 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/configuration/ConnectionConfiguration.java
@@ -1,13 +1,16 @@
package org.apache.curator.x.rpc.configuration;
+import com.google.common.base.Preconditions;
import io.airlift.units.Duration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
import javax.validation.constraints.NotNull;
import java.util.concurrent.TimeUnit;
public class ConnectionConfiguration
{
@NotNull private String name;
- private String connectionString = "localhost:2181";
+ private String connectionString = null;
private Duration sessionLength = new Duration(1, TimeUnit.MINUTES);
private Duration connectionTimeout = new Duration(15, TimeUnit.SECONDS);
private AuthorizationConfiguration authorization = null;
@@ -83,4 +86,26 @@ public class ConnectionConfiguration
{
this.retry = retry;
}
+
+ public CuratorFramework build()
+ {
+ Preconditions.checkState((connectionString != null) && (connectionString.length() > 0), "You must specify a connection string for connection: " + name);
+ Preconditions.checkNotNull(retry, "retry cannot be null");
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+ builder = builder
+ .connectString(connectionString)
+ .sessionTimeoutMs((int)sessionLength.toMillis())
+ .connectionTimeoutMs((int)connectionTimeout.toMillis())
+ .retryPolicy(retry.build());
+ if ( authorization != null )
+ {
+ builder = builder.authorization(authorization.getScheme(), authorization.getAuth().getBytes());
+ }
+ if ( namespace != null )
+ {
+ builder = builder.namespace(namespace);
+ }
+ return builder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
new file mode 100644
index 0000000..060c7d7
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/Closer.java
@@ -0,0 +1,6 @@
+package org.apache.curator.x.rpc.connections;
+
+public interface Closer<T>
+{
+ public void close(T thing);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
new file mode 100644
index 0000000..b101c6d
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -0,0 +1,115 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.configuration.ConnectionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConnectionManager implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Cache<String, CuratorEntry> cache;
+ private final Map<String, CuratorFramework> clients;
+ private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+
+ private enum State
+ {
+ LATENT,
+ STARTED,
+ CLOSED
+ }
+
+ public ConnectionManager(List<ConnectionConfiguration> connections, long expirationMs)
+ {
+ RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
+ {
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public void onRemoval(RemovalNotification<String, CuratorEntry> notification)
+ {
+ if ( notification != null )
+ {
+ log.debug(String.format("Entry being removed. id (%s), reason (%s)", notification.getKey(), notification.getCause()));
+
+ CuratorEntry entry = notification.getValue();
+ if ( entry != null )
+ {
+ entry.close();
+ }
+ }
+ }
+ };
+ cache = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
+ .removalListener(listener)
+ .build();
+
+ clients = buildClients(connections);
+ }
+
+ public void start()
+ {
+ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ for ( CuratorFramework client : clients.values() )
+ {
+ client.start();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.STARTED, State.CLOSED) )
+ {
+ cache.invalidateAll();
+ cache.cleanUp();
+
+ for ( CuratorFramework client : clients.values() )
+ {
+ client.close();
+ }
+ }
+ }
+
+ public void add(String id, CuratorFramework client)
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+ cache.put(id, new CuratorEntry(client));
+ }
+
+ public CuratorEntry get(String id)
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+ return cache.getIfPresent(id);
+ }
+
+ public CuratorEntry remove(String id)
+ {
+ Preconditions.checkState(state.get() == State.STARTED, "Not started");
+ return cache.asMap().remove(id);
+ }
+
+ private Map<String, CuratorFramework> buildClients(List<ConnectionConfiguration> connections)
+ {
+ Preconditions.checkArgument(connections.size() > 0, "You must have at least one connection configured");
+
+ ImmutableMap.Builder<String, CuratorFramework> builder = ImmutableMap.builder();
+ for ( ConnectionConfiguration configuration : connections )
+ {
+ builder.put(configuration.getName(), configuration.build());
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
new file mode 100644
index 0000000..0c4bb8b
--- /dev/null
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/CuratorEntry.java
@@ -0,0 +1,127 @@
+package org.apache.curator.x.rpc.connections;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CuratorEntry implements Closeable
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFramework client;
+ private final BlockingQueue<RpcCuratorEvent> events = Queues.newLinkedBlockingQueue();
+ private final AtomicReference<State> state = new AtomicReference<State>(State.OPEN);
+ private final Map<String, Entry> things = Maps.newConcurrentMap();
+
+ private static class Entry
+ {
+ final Object thing;
+ final Closer closer;
+
+ private Entry(Object thing, Closer closer)
+ {
+ this.thing = thing;
+ this.closer = closer;
+ }
+ }
+
+ private enum State
+ {
+ OPEN,
+ CLOSED
+ }
+
+ public CuratorEntry(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public void close()
+ {
+ if ( state.compareAndSet(State.OPEN, State.CLOSED) )
+ {
+ for ( Map.Entry<String, Entry> mapEntry : things.entrySet() )
+ {
+ Entry entry = mapEntry.getValue();
+ if ( entry.closer != null )
+ {
+ log.debug(String.format("Closing left over thing. Type: %s - Id: %s", entry.thing.getClass(), mapEntry.getKey()));
+ //noinspection unchecked
+ entry.closer.close(entry.thing); // lack of generics is safe because addThing() is type-safe
+ }
+ }
+ things.clear();
+
+ client.close();
+ events.clear();
+ }
+ }
+
+ public RpcCuratorEvent pollForEvent(long maxWaitMs) throws InterruptedException
+ {
+ if ( state.get() == State.OPEN )
+ {
+ return events.poll(maxWaitMs, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+
+ public void addEvent(RpcCuratorEvent event)
+ {
+ if ( state.get() == State.OPEN )
+ {
+ events.offer(event);
+ }
+ }
+
+ public CuratorFramework getClient()
+ {
+ return (state.get() == State.OPEN) ? client : null;
+ }
+
+ public <T> String addThing(T thing, Closer<T> closer)
+ {
+ return addThing(UUID.randomUUID().toString(), thing, closer);
+ }
+
+ public <T> String addThing(String id, T thing, Closer<T> closer)
+ {
+ things.put(id, new Entry(thing, closer));
+ return id;
+ }
+
+ public <T> T getThing(String id, Class<T> clazz)
+ {
+ Entry entry = (id != null) ? things.get(id) : null;
+ return cast(clazz, entry);
+ }
+
+ public boolean closeThing(String id)
+ {
+ Entry entry = (id != null) ? things.remove(id) : null;
+ if ( entry != null )
+ {
+ //noinspection unchecked
+ entry.closer.close(entry.thing);
+ }
+ return false;
+ }
+
+ private <T> T cast(Class<T> clazz, Entry entry)
+ {
+ if ( entry != null )
+ {
+ return clazz.cast(entry.thing);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
index 13598c8..933f3af 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/event/EventService.java
@@ -2,26 +2,26 @@ package org.apache.curator.x.rpc.idl.event;
import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
import org.apache.curator.x.rpc.idl.projection.CuratorProjection;
@ThriftService("EventService")
public class EventService
{
- private final RpcManager rpcManager;
+ private final ConnectionManager connectionManager;
private final long pingTimeMs;
- public EventService(RpcManager rpcManager, long pingTimeMs)
+ public EventService(ConnectionManager connectionManager, long pingTimeMs)
{
- this.rpcManager = rpcManager;
+ this.connectionManager = connectionManager;
this.pingTimeMs = pingTimeMs;
}
@ThriftMethod
public RpcCuratorEvent getNextEvent(CuratorProjection projection) throws InterruptedException
{
- CuratorEntry entry = rpcManager.get(projection.id);
+ CuratorEntry entry = connectionManager.get(projection.id);
if ( entry == null )
{
// TODO
http://git-wip-us.apache.org/repos/asf/curator/blob/e5ecbcd9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index 708dc32..b76730d 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -34,9 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.x.rpc.Closer;
-import org.apache.curator.x.rpc.CuratorEntry;
-import org.apache.curator.x.rpc.RpcManager;
+import org.apache.curator.x.rpc.connections.Closer;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
+import org.apache.curator.x.rpc.connections.ConnectionManager;
import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +47,11 @@ import java.util.concurrent.TimeUnit;
public class CuratorProjectionService
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final RpcManager rpcManager;
+ private final ConnectionManager connectionManager;
- public CuratorProjectionService(RpcManager rpcManager)
+ public CuratorProjectionService(ConnectionManager connectionManager)
{
- this.rpcManager = rpcManager;
+ this.connectionManager = connectionManager;
}
@ThriftMethod
@@ -60,7 +60,7 @@ public class CuratorProjectionService
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
String id = UUID.randomUUID().toString();
client.start();
- rpcManager.add(id, client);
+ connectionManager.add(id, client);
final CuratorProjection projection = new CuratorProjection(id);
ConnectionStateListener listener = new ConnectionStateListener()
@@ -79,7 +79,7 @@ public class CuratorProjectionService
@ThriftMethod
public void closeCuratorProjection(CuratorProjection projection)
{
- CuratorEntry entry = rpcManager.remove(projection.id);
+ CuratorEntry entry = connectionManager.remove(projection.id);
if ( entry != null )
{
entry.close();
@@ -166,7 +166,7 @@ public class CuratorProjectionService
private void addEvent(CuratorProjection projection, RpcCuratorEvent event)
{
- CuratorEntry entry = rpcManager.get(projection.id);
+ CuratorEntry entry = connectionManager.get(projection.id);
if ( entry != null )
{
entry.addEvent(event);
@@ -202,7 +202,7 @@ public class CuratorProjectionService
private CuratorEntry getEntry(CuratorProjection projection) throws Exception
{
- CuratorEntry entry = rpcManager.get(projection.id);
+ CuratorEntry entry = connectionManager.get(projection.id);
if ( entry == null )
{
throw new Exception("No client found with id: " + projection.id);
[2/2] git commit: Completed move to new configuration. Connection
specs are now fully specified in the server via configuration. Clients create
projections by name only.
Posted by ra...@apache.org.
Completed move to new configuration. Connection specs are now fully specified in the server via configuration. Clients create projections by name only.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/530010d9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/530010d9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/530010d9
Branch: refs/heads/curator-rpc
Commit: 530010d902cb8a2ff09ffe66048e9f11a0650cb1
Parents: e5ecbcd
Author: randgalt <ra...@apache.org>
Authored: Wed May 28 20:35:03 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed May 28 20:35:03 2014 -0500
----------------------------------------------------------------------
.../curator/x/rpc/CuratorProjectionServer.java | 2 +-
.../x/rpc/connections/ConnectionManager.java | 48 ++--
.../projection/CuratorProjectionService.java | 9 +-
.../idl/projection/CuratorProjectionSpec.java | 26 --
curator-x-rpc/src/main/thrift/curator.thrift | 5 +-
.../generated/CuratorProjectionSpec.java | 280 -------------------
.../curator/generated/CuratorService.java | 139 +++++----
.../org/apache/curator/x/rpc/TestClient.java | 3 +-
.../org/apache/curator/x/rpc/TestServer.java | 13 +
.../src/test/resources/configuration/test.json | 14 +
10 files changed, 130 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index e8ebfc5..0ab490b 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -71,7 +71,7 @@ public class CuratorProjectionServer
}
else
{
- System.out.println("First argument is not a file. Treating the command line as a json object");
+ System.out.println("First argument is not a file. Treating the command line as a json/yaml object");
configurationSource = args[0];
}
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
index b101c6d..21fe762 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -7,12 +7,14 @@ import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.x.rpc.configuration.ConnectionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -20,8 +22,11 @@ public class ConnectionManager implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final Cache<String, CuratorEntry> cache;
- private final Map<String, CuratorFramework> clients;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final Map<String, ConnectionConfiguration> connections;
+ private final ScheduledExecutorService service = ThreadUtils.newSingleThreadScheduledExecutor("ConnectionManager");
+
+ private static final int FORCED_CLEANUP_SECONDS = 30;
private enum State
{
@@ -32,6 +37,8 @@ public class ConnectionManager implements Closeable
public ConnectionManager(List<ConnectionConfiguration> connections, long expirationMs)
{
+ this.connections = buildConnectionsMap(connections);
+
RemovalListener<String, CuratorEntry> listener = new RemovalListener<String, CuratorEntry>()
{
@SuppressWarnings("NullableProblems")
@@ -50,22 +57,22 @@ public class ConnectionManager implements Closeable
}
}
};
- cache = CacheBuilder
- .newBuilder()
- .expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS)
- .removalListener(listener)
- .build();
-
- clients = buildClients(connections);
+ cache = CacheBuilder.newBuilder().expireAfterAccess(expirationMs, TimeUnit.MILLISECONDS).removalListener(listener).build();
}
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
- for ( CuratorFramework client : clients.values() )
+
+ Runnable cleanup = new Runnable()
{
- client.start();
- }
+ @Override
+ public void run()
+ {
+ cache.cleanUp();
+ }
+ };
+ service.scheduleWithFixedDelay(cleanup, FORCED_CLEANUP_SECONDS, 30, TimeUnit.SECONDS);
}
@Override
@@ -73,16 +80,19 @@ public class ConnectionManager implements Closeable
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
+ service.shutdownNow();
cache.invalidateAll();
cache.cleanUp();
-
- for ( CuratorFramework client : clients.values() )
- {
- client.close();
- }
}
}
+ public CuratorFramework newConnection(String name)
+ {
+ ConnectionConfiguration configuration = connections.get(name);
+ Preconditions.checkNotNull(configuration, "No connection configuration with that name was found: " + name);
+ return configuration.build();
+ }
+
public void add(String id, CuratorFramework client)
{
Preconditions.checkState(state.get() == State.STARTED, "Not started");
@@ -101,14 +111,14 @@ public class ConnectionManager implements Closeable
return cache.asMap().remove(id);
}
- private Map<String, CuratorFramework> buildClients(List<ConnectionConfiguration> connections)
+ private Map<String, ConnectionConfiguration> buildConnectionsMap(List<ConnectionConfiguration> connections)
{
Preconditions.checkArgument(connections.size() > 0, "You must have at least one connection configured");
- ImmutableMap.Builder<String, CuratorFramework> builder = ImmutableMap.builder();
+ ImmutableMap.Builder<String, ConnectionConfiguration> builder = ImmutableMap.builder();
for ( ConnectionConfiguration configuration : connections )
{
- builder.put(configuration.getName(), configuration.build());
+ builder.put(configuration.getName(), configuration);
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
index b76730d..880f290 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionService.java
@@ -22,7 +22,6 @@ package org.apache.curator.x.rpc.idl.projection;
import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.Backgroundable;
import org.apache.curator.framework.api.Compressible;
@@ -33,10 +32,9 @@ import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.x.rpc.connections.Closer;
-import org.apache.curator.x.rpc.connections.CuratorEntry;
import org.apache.curator.x.rpc.connections.ConnectionManager;
+import org.apache.curator.x.rpc.connections.CuratorEntry;
import org.apache.curator.x.rpc.idl.event.RpcCuratorEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,9 +53,10 @@ public class CuratorProjectionService
}
@ThriftMethod
- public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) // TODO
+ public CuratorProjection newCuratorProjection(String connectionName)
{
- CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryOneTime(1));
+ CuratorFramework client = connectionManager.newConnection(connectionName);
+
String id = UUID.randomUUID().toString();
client.start();
connectionManager.add(id, client);
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
deleted file mode 100644
index b79990c..0000000
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/projection/CuratorProjectionSpec.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.rpc.idl.projection;
-
-import com.facebook.swift.codec.ThriftStruct;
-
-@ThriftStruct
-public class CuratorProjectionSpec
-{
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/main/thrift/curator.thrift
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index b03cc66..491753b 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -33,9 +33,6 @@ struct CuratorProjection {
1: string id;
}
-struct CuratorProjectionSpec {
-}
-
struct GenericProjection {
1: string id;
}
@@ -88,7 +85,7 @@ service CuratorService {
void closeCuratorProjection(1: CuratorProjection projection);
bool closeGenericProjection(1: CuratorProjection curatorProjection, 2: GenericProjection genericProjection);
string create(1: CuratorProjection projection, 2: CreateSpec createSpec);
- CuratorProjection newCuratorProjection(1: CuratorProjectionSpec spec);
+ CuratorProjection newCuratorProjection(1: string connectionName);
}
service EventService {
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
deleted file mode 100644
index 1171cc9..0000000
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorProjectionSpec.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.curator.generated;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CuratorProjectionSpec implements org.apache.thrift.TBase<CuratorProjectionSpec, CuratorProjectionSpec._Fields>, java.io.Serializable, Cloneable, Comparable<CuratorProjectionSpec> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CuratorProjectionSpec");
-
-
- private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
- static {
- schemes.put(StandardScheme.class, new CuratorProjectionSpecStandardSchemeFactory());
- schemes.put(TupleScheme.class, new CuratorProjectionSpecTupleSchemeFactory());
- }
-
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-;
-
- private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
- static {
- for (_Fields field : EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- public static _Fields findByName(String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final String _fieldName;
-
- _Fields(short thriftId, String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public String getFieldName() {
- return _fieldName;
- }
- }
- public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CuratorProjectionSpec.class, metaDataMap);
- }
-
- public CuratorProjectionSpec() {
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public CuratorProjectionSpec(CuratorProjectionSpec other) {
- }
-
- public CuratorProjectionSpec deepCopy() {
- return new CuratorProjectionSpec(this);
- }
-
- @Override
- public void clear() {
- }
-
- public void setFieldValue(_Fields field, Object value) {
- switch (field) {
- }
- }
-
- public Object getFieldValue(_Fields field) {
- switch (field) {
- }
- throw new IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new IllegalArgumentException();
- }
-
- switch (field) {
- }
- throw new IllegalStateException();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == null)
- return false;
- if (that instanceof CuratorProjectionSpec)
- return this.equals((CuratorProjectionSpec)that);
- return false;
- }
-
- public boolean equals(CuratorProjectionSpec that) {
- if (that == null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- @Override
- public int compareTo(CuratorProjectionSpec other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- return 0;
- }
-
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("CuratorProjectionSpec(");
- boolean first = true;
-
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
- try {
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class CuratorProjectionSpecStandardSchemeFactory implements SchemeFactory {
- public CuratorProjectionSpecStandardScheme getScheme() {
- return new CuratorProjectionSpecStandardScheme();
- }
- }
-
- private static class CuratorProjectionSpecStandardScheme extends StandardScheme<CuratorProjectionSpec> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class CuratorProjectionSpecTupleSchemeFactory implements SchemeFactory {
- public CuratorProjectionSpecTupleScheme getScheme() {
- return new CuratorProjectionSpecTupleScheme();
- }
- }
-
- private static class CuratorProjectionSpecTupleScheme extends TupleScheme<CuratorProjectionSpec> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
- TTupleProtocol oprot = (TTupleProtocol) prot;
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, CuratorProjectionSpec struct) throws org.apache.thrift.TException {
- TTupleProtocol iprot = (TTupleProtocol) prot;
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
index 0fd1905..8eae496 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/generated/CuratorService.java
@@ -44,7 +44,7 @@ public class CuratorService {
public String create(CuratorProjection projection, CreateSpec createSpec) throws org.apache.thrift.TException;
- public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException;
+ public CuratorProjection newCuratorProjection(String connectionName) throws org.apache.thrift.TException;
}
@@ -58,7 +58,7 @@ public class CuratorService {
public void create(CuratorProjection projection, CreateSpec createSpec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
- public void newCuratorProjection(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void newCuratorProjection(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
@@ -175,16 +175,16 @@ public class CuratorService {
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "create failed: unknown result");
}
- public CuratorProjection newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException
+ public CuratorProjection newCuratorProjection(String connectionName) throws org.apache.thrift.TException
{
- send_newCuratorProjection(spec);
+ send_newCuratorProjection(connectionName);
return recv_newCuratorProjection();
}
- public void send_newCuratorProjection(CuratorProjectionSpec spec) throws org.apache.thrift.TException
+ public void send_newCuratorProjection(String connectionName) throws org.apache.thrift.TException
{
newCuratorProjection_args args = new newCuratorProjection_args();
- args.setSpec(spec);
+ args.setConnectionName(connectionName);
sendBase("newCuratorProjection", args);
}
@@ -356,24 +356,24 @@ public class CuratorService {
}
}
- public void newCuratorProjection(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void newCuratorProjection(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- newCuratorProjection_call method_call = new newCuratorProjection_call(spec, resultHandler, this, ___protocolFactory, ___transport);
+ newCuratorProjection_call method_call = new newCuratorProjection_call(connectionName, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class newCuratorProjection_call extends org.apache.thrift.async.TAsyncMethodCall {
- private CuratorProjectionSpec spec;
- public newCuratorProjection_call(CuratorProjectionSpec spec, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private String connectionName;
+ public newCuratorProjection_call(String connectionName, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
- this.spec = spec;
+ this.connectionName = connectionName;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("newCuratorProjection", org.apache.thrift.protocol.TMessageType.CALL, 0));
newCuratorProjection_args args = new newCuratorProjection_args();
- args.setSpec(spec);
+ args.setConnectionName(connectionName);
args.write(prot);
prot.writeMessageEnd();
}
@@ -505,7 +505,7 @@ public class CuratorService {
public newCuratorProjection_result getResult(I iface, newCuratorProjection_args args) throws org.apache.thrift.TException {
newCuratorProjection_result result = new newCuratorProjection_result();
- result.success = iface.newCuratorProjection(args.spec);
+ result.success = iface.newCuratorProjection(args.connectionName);
return result;
}
}
@@ -782,7 +782,7 @@ public class CuratorService {
}
public void start(I iface, newCuratorProjection_args args, org.apache.thrift.async.AsyncMethodCallback<CuratorProjection> resultHandler) throws TException {
- iface.newCuratorProjection(args.spec,resultHandler);
+ iface.newCuratorProjection(args.connectionName,resultHandler);
}
}
@@ -3948,7 +3948,7 @@ public class CuratorService {
public static class newCuratorProjection_args implements org.apache.thrift.TBase<newCuratorProjection_args, newCuratorProjection_args._Fields>, java.io.Serializable, Cloneable, Comparable<newCuratorProjection_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("newCuratorProjection_args");
- private static final org.apache.thrift.protocol.TField SPEC_FIELD_DESC = new org.apache.thrift.protocol.TField("spec", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField CONNECTION_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("connectionName", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -3956,11 +3956,11 @@ public class CuratorService {
schemes.put(TupleScheme.class, new newCuratorProjection_argsTupleSchemeFactory());
}
- public CuratorProjectionSpec spec; // required
+ public String connectionName; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- SPEC((short)1, "spec");
+ CONNECTION_NAME((short)1, "connectionName");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -3975,8 +3975,8 @@ public class CuratorService {
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
- case 1: // SPEC
- return SPEC;
+ case 1: // CONNECTION_NAME
+ return CONNECTION_NAME;
default:
return null;
}
@@ -4020,8 +4020,8 @@ public class CuratorService {
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.SPEC, new org.apache.thrift.meta_data.FieldMetaData("spec", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CuratorProjectionSpec.class)));
+ tmpMap.put(_Fields.CONNECTION_NAME, new org.apache.thrift.meta_data.FieldMetaData("connectionName", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(newCuratorProjection_args.class, metaDataMap);
}
@@ -4030,18 +4030,18 @@ public class CuratorService {
}
public newCuratorProjection_args(
- CuratorProjectionSpec spec)
+ String connectionName)
{
this();
- this.spec = spec;
+ this.connectionName = connectionName;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public newCuratorProjection_args(newCuratorProjection_args other) {
- if (other.isSetSpec()) {
- this.spec = new CuratorProjectionSpec(other.spec);
+ if (other.isSetConnectionName()) {
+ this.connectionName = other.connectionName;
}
}
@@ -4051,40 +4051,40 @@ public class CuratorService {
@Override
public void clear() {
- this.spec = null;
+ this.connectionName = null;
}
- public CuratorProjectionSpec getSpec() {
- return this.spec;
+ public String getConnectionName() {
+ return this.connectionName;
}
- public newCuratorProjection_args setSpec(CuratorProjectionSpec spec) {
- this.spec = spec;
+ public newCuratorProjection_args setConnectionName(String connectionName) {
+ this.connectionName = connectionName;
return this;
}
- public void unsetSpec() {
- this.spec = null;
+ public void unsetConnectionName() {
+ this.connectionName = null;
}
- /** Returns true if field spec is set (has been assigned a value) and false otherwise */
- public boolean isSetSpec() {
- return this.spec != null;
+ /** Returns true if field connectionName is set (has been assigned a value) and false otherwise */
+ public boolean isSetConnectionName() {
+ return this.connectionName != null;
}
- public void setSpecIsSet(boolean value) {
+ public void setConnectionNameIsSet(boolean value) {
if (!value) {
- this.spec = null;
+ this.connectionName = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
- case SPEC:
+ case CONNECTION_NAME:
if (value == null) {
- unsetSpec();
+ unsetConnectionName();
} else {
- setSpec((CuratorProjectionSpec)value);
+ setConnectionName((String)value);
}
break;
@@ -4093,8 +4093,8 @@ public class CuratorService {
public Object getFieldValue(_Fields field) {
switch (field) {
- case SPEC:
- return getSpec();
+ case CONNECTION_NAME:
+ return getConnectionName();
}
throw new IllegalStateException();
@@ -4107,8 +4107,8 @@ public class CuratorService {
}
switch (field) {
- case SPEC:
- return isSetSpec();
+ case CONNECTION_NAME:
+ return isSetConnectionName();
}
throw new IllegalStateException();
}
@@ -4126,12 +4126,12 @@ public class CuratorService {
if (that == null)
return false;
- boolean this_present_spec = true && this.isSetSpec();
- boolean that_present_spec = true && that.isSetSpec();
- if (this_present_spec || that_present_spec) {
- if (!(this_present_spec && that_present_spec))
+ boolean this_present_connectionName = true && this.isSetConnectionName();
+ boolean that_present_connectionName = true && that.isSetConnectionName();
+ if (this_present_connectionName || that_present_connectionName) {
+ if (!(this_present_connectionName && that_present_connectionName))
return false;
- if (!this.spec.equals(that.spec))
+ if (!this.connectionName.equals(that.connectionName))
return false;
}
@@ -4151,12 +4151,12 @@ public class CuratorService {
int lastComparison = 0;
- lastComparison = Boolean.valueOf(isSetSpec()).compareTo(other.isSetSpec());
+ lastComparison = Boolean.valueOf(isSetConnectionName()).compareTo(other.isSetConnectionName());
if (lastComparison != 0) {
return lastComparison;
}
- if (isSetSpec()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.spec, other.spec);
+ if (isSetConnectionName()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.connectionName, other.connectionName);
if (lastComparison != 0) {
return lastComparison;
}
@@ -4181,11 +4181,11 @@ public class CuratorService {
StringBuilder sb = new StringBuilder("newCuratorProjection_args(");
boolean first = true;
- sb.append("spec:");
- if (this.spec == null) {
+ sb.append("connectionName:");
+ if (this.connectionName == null) {
sb.append("null");
} else {
- sb.append(this.spec);
+ sb.append(this.connectionName);
}
first = false;
sb.append(")");
@@ -4195,9 +4195,6 @@ public class CuratorService {
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
- if (spec != null) {
- spec.validate();
- }
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -4234,11 +4231,10 @@ public class CuratorService {
break;
}
switch (schemeField.id) {
- case 1: // SPEC
- if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
- struct.spec = new CuratorProjectionSpec();
- struct.spec.read(iprot);
- struct.setSpecIsSet(true);
+ case 1: // CONNECTION_NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.connectionName = iprot.readString();
+ struct.setConnectionNameIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -4258,9 +4254,9 @@ public class CuratorService {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
- if (struct.spec != null) {
- oprot.writeFieldBegin(SPEC_FIELD_DESC);
- struct.spec.write(oprot);
+ if (struct.connectionName != null) {
+ oprot.writeFieldBegin(CONNECTION_NAME_FIELD_DESC);
+ oprot.writeString(struct.connectionName);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
@@ -4281,12 +4277,12 @@ public class CuratorService {
public void write(org.apache.thrift.protocol.TProtocol prot, newCuratorProjection_args struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
- if (struct.isSetSpec()) {
+ if (struct.isSetConnectionName()) {
optionals.set(0);
}
oprot.writeBitSet(optionals, 1);
- if (struct.isSetSpec()) {
- struct.spec.write(oprot);
+ if (struct.isSetConnectionName()) {
+ oprot.writeString(struct.connectionName);
}
}
@@ -4295,9 +4291,8 @@ public class CuratorService {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
- struct.spec = new CuratorProjectionSpec();
- struct.spec.read(iprot);
- struct.setSpecIsSet(true);
+ struct.connectionName = iprot.readString();
+ struct.setConnectionNameIsSet(true);
}
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
index f65138f..b48c066 100644
--- a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestClient.java
@@ -22,7 +22,6 @@ import org.apache.curator.generated.CreateSpec;
import org.apache.curator.generated.CuratorEvent;
import org.apache.curator.generated.CuratorEventType;
import org.apache.curator.generated.CuratorProjection;
-import org.apache.curator.generated.CuratorProjectionSpec;
import org.apache.curator.generated.CuratorService;
import org.apache.curator.generated.EventService;
import org.apache.curator.generated.GenericProjection;
@@ -49,7 +48,7 @@ public class TestClient
TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
final EventService.Client serviceClient = new EventService.Client(eventProtocol);
- final CuratorProjection curatorProjection = client.newCuratorProjection(new CuratorProjectionSpec());
+ final CuratorProjection curatorProjection = client.newCuratorProjection("test");
Executors.newSingleThreadExecutor().submit
(
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
new file mode 100644
index 0000000..9472dd8
--- /dev/null
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/TestServer.java
@@ -0,0 +1,13 @@
+package org.apache.curator.x.rpc;
+
+import com.google.common.io.Resources;
+import java.nio.charset.Charset;
+
+public class TestServer
+{
+ public static void main(String[] args) throws Exception
+ {
+ String configurationSource = Resources.toString(Resources.getResource("configuration/test.json"), Charset.defaultCharset());
+ CuratorProjectionServer.main(new String[]{configurationSource});
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/530010d9/curator-x-rpc/src/test/resources/configuration/test.json
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/resources/configuration/test.json b/curator-x-rpc/src/test/resources/configuration/test.json
new file mode 100644
index 0000000..c9d6cb8
--- /dev/null
+++ b/curator-x-rpc/src/test/resources/configuration/test.json
@@ -0,0 +1,14 @@
+{
+ "projectionExpiration": "15s",
+
+ "thrift": {
+ "port": 8899
+ },
+
+ "connections": [
+ {
+ "name": "test",
+ "connectionString": "localhost:2181"
+ }
+ ]
+}
\ No newline at end of file