You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/02 18:20:47 UTC
[1/3] git commit: start writing RPC tests
Repository: curator
Updated Branches:
refs/heads/curator-rpc f3ec63cb6 -> 9077ad001
start writing RPC tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bee07a52
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bee07a52
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bee07a52
Branch: refs/heads/curator-rpc
Commit: bee07a525682436ce33e7110b8f6688ec1df285f
Parents: f3ec63c
Author: randgalt <ra...@apache.org>
Authored: Sun Jun 1 16:11:02 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jun 1 16:11:02 2014 -0500
----------------------------------------------------------------------
.../curator/x/rpc/CuratorProjectionServer.java | 14 +-
.../java/org/apache/curator/x/rpc/RpcTests.java | 207 +++++++++++++++++++
2 files changed, 217 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 806f06e..a01f462 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -77,10 +77,7 @@ public class CuratorProjectionServer
configurationSource = args[0];
}
- Configuration configuration = new ConfigurationBuilder(configurationSource).build();
-
- final CuratorProjectionServer server = new CuratorProjectionServer(configuration);
- server.start();
+ final CuratorProjectionServer server = startServer(configurationSource);
Runnable shutdown = new Runnable()
{
@@ -94,6 +91,15 @@ public class CuratorProjectionServer
Runtime.getRuntime().addShutdownHook(hook);
}
+ public static CuratorProjectionServer startServer(String configurationSource) throws Exception
+ {
+ Configuration configuration = new ConfigurationBuilder(configurationSource).build();
+
+ final CuratorProjectionServer server = new CuratorProjectionServer(configuration);
+ server.start();
+ return server;
+ }
+
public CuratorProjectionServer(Configuration configuration)
{
this.configuration = configuration;
http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
new file mode 100644
index 0000000..5a83b25
--- /dev/null
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
@@ -0,0 +1,207 @@
+package org.apache.curator.x.rpc;
+
+import org.apache.curator.generated.*;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RpcTests extends BaseClassForTests
+{
+ private Timing timing = new Timing();
+ private CuratorProjectionServer thriftServer;
+ private CuratorService.Client curatorServiceClient;
+ private EventService.Client eventServiceClient;
+ private int thriftPort;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ ObjectNode connectionNode = mapper.createObjectNode();
+ connectionNode.put("name", "test");
+ connectionNode.put("connectionString", server.getConnectString());
+
+ ObjectNode thriftNode = mapper.createObjectNode();
+ thriftPort = InstanceSpec.getRandomPort();
+ thriftNode.put("port", thriftPort);
+
+ ArrayNode connections = mapper.createArrayNode();
+ connections.add(connectionNode);
+
+ ObjectNode node = mapper.createObjectNode();
+ node.put("connections", connections);
+ node.put("thrift", thriftNode);
+
+ final String configurationJson = mapper.writeValueAsString(node);
+
+ thriftServer = CuratorProjectionServer.startServer(configurationJson);
+
+ TSocket clientTransport = new TSocket("localhost", thriftPort);
+ clientTransport.setTimeout(timing.connection());
+ clientTransport.open();
+ TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+ curatorServiceClient = new CuratorService.Client(clientProtocol);
+
+ TSocket eventTransport = new TSocket("localhost", thriftPort);
+ eventTransport.setTimeout(timing.connection());
+ eventTransport.open();
+ TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
+ eventServiceClient = new EventService.Client(eventProtocol);
+
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ thriftServer.stop();
+
+ super.teardown();
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test");
+ CreateSpec spec = new CreateSpec();
+ spec.path = "/test";
+ spec.data = ByteBuffer.wrap("value".getBytes());
+ OptionalPath node = curatorServiceClient.createNode(curatorProjection, spec);
+ Assert.assertEquals(node.path, "/test");
+
+ GetDataSpec dataSpec = new GetDataSpec();
+ dataSpec.path = "/test";
+ OptionalData data = curatorServiceClient.getData(curatorProjection, dataSpec);
+ Assert.assertEquals(data.data, ByteBuffer.wrap("value".getBytes()));
+ }
+
+ @Test
+ public void testEvents() throws Exception
+ {
+ final CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test");
+
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+ Callable<Void> proc = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ while ( !Thread.currentThread().isInterrupted() )
+ {
+ CuratorEvent event = eventServiceClient.getNextEvent(curatorProjection);
+ if ( event.type == CuratorEventType.CONNECTION_CONNECTED )
+ {
+ connectedLatch.countDown();
+ }
+ else if ( event.type == CuratorEventType.WATCHED )
+ {
+ if ( event.watchedEvent.eventType == EventType.NodeCreated )
+ {
+ nodeCreatedLatch.countDown();
+ }
+ }
+ }
+ return null;
+ }
+ };
+ Future<Void> eventFuture = ThreadUtils.newSingleThreadExecutor("test").submit(proc);
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+ ExistsSpec spec = new ExistsSpec();
+ spec.path = "/test";
+ spec.watched = true;
+ curatorServiceClient.exists(curatorProjection, spec);
+
+ CreateSpec createSpec = new CreateSpec();
+ createSpec.path = "/test";
+ curatorServiceClient.createNode(curatorProjection, createSpec);
+
+ Assert.assertTrue(timing.awaitLatch(nodeCreatedLatch));
+
+ eventFuture.cancel(true);
+ }
+
+ @Test
+ public void testLockMultiThread() throws Exception
+ {
+ final Timing timing = new Timing();
+
+ TSocket clientTransport = new TSocket("localhost", thriftPort);
+ clientTransport.setTimeout(timing.connection());
+ clientTransport.open();
+ TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+ final CuratorService.Client secondCuratorServiceClient = new CuratorService.Client(clientProtocol);
+ ExecutorService service = ThreadUtils.newFixedThreadPool(2, "test");
+ ExecutorCompletionService<Void> completer = new ExecutorCompletionService<Void>(service);
+
+ final CountDownLatch lockLatch = new CountDownLatch(2);
+ final AtomicBoolean hasTheLock = new AtomicBoolean();
+ for ( int i = 0; i < 2; ++i )
+ {
+ final CuratorService.Client client = (i == 0) ? curatorServiceClient : secondCuratorServiceClient;
+ Callable<Void> proc = new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ CuratorProjection curatorProjection = client.newCuratorProjection("test");
+ LockProjection lockProjection = client.acquireLock(curatorProjection, "/lock", timing.forWaiting().milliseconds());
+ if ( lockProjection.id == null )
+ {
+ throw new Exception("Could not acquire lock");
+ }
+ try
+ {
+ if ( !hasTheLock.compareAndSet(false, true) )
+ {
+ throw new Exception("Two lockers");
+ }
+
+ timing.sleepABit();
+ }
+ finally
+ {
+ hasTheLock.set(false);
+ lockLatch.countDown();
+ client.closeGenericProjection(curatorProjection, lockProjection.id);
+ }
+
+ return null;
+ }
+ };
+ completer.submit(proc);
+ }
+
+ completer.take().get();
+ completer.take().get();
+
+ Assert.assertTrue(timing.awaitLatch(lockLatch));
+
+ service.shutdownNow();
+ }
+}
+
[3/3] git commit: doc wip
Posted by ra...@apache.org.
doc wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9077ad00
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9077ad00
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9077ad00
Branch: refs/heads/curator-rpc
Commit: 9077ad001b32868ac9abcf172438dff9ff82e52f
Parents: 30019cd
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 2 11:20:34 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 2 11:20:34 2014 -0500
----------------------------------------------------------------------
.../src/site/confluence/configuration.confluence | 15 +++++----------
.../src/site/confluence/reference.confluence | 16 +++++++---------
curator-x-rpc/src/site/confluence/usage.confluence | 2 +-
3 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/configuration.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/configuration.confluence b/curator-x-rpc/src/site/confluence/configuration.confluence
index d2ae8c8..21912ae 100644
--- a/curator-x-rpc/src/site/confluence/configuration.confluence
+++ b/curator-x-rpc/src/site/confluence/configuration.confluence
@@ -124,22 +124,17 @@ h3. Retry
The retry policy configuration depends on what type is used. There 3 types supported:
-_exponential\-backoff_
||Name||Type||Default Value||Description||
-|type|string|n/a|"exponential\-backoff"|
+|type|string|n/a|*exponential\-backoff*|
|baseSleepTime|Duration|100 milliseconds|The base sleep time|
|maxRetries|int|3|The max retries|
-
-_bounded\-exponential\-backoff_
-||Name||Type||Default Value||Description||
-|type|string|n/a|"bounded\-exponential\-backoff"|
+|\_|\_|\_|\_|
+|type|string|n/a|*bounded\-exponential\-backoff*|
|baseSleepTime|Duration|100 milliseconds|The base sleep time|
|maxSleepTime|Duration|30 seconds|The max sleep time|
|maxRetries|int|3|The max retries|
-
-_ntimes_
-||Name||Type||Default Value||Description||
-|type|string|n/a|"ntimes"|
+|\_|\_|\_|\_|
+|type|string|n/a|*ntimes*|
|sleepBetweenRetries|int|100 milliseconds|sleep time between retries|
|n|int|3|the number of retries|
http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/reference.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/reference.confluence b/curator-x-rpc/src/site/confluence/reference.confluence
index 0ecb26b..dde323f 100644
--- a/curator-x-rpc/src/site/confluence/reference.confluence
+++ b/curator-x-rpc/src/site/confluence/reference.confluence
@@ -47,12 +47,10 @@ h2. DiscoveryService
h1. Struct Reference
-_Required fields have (r) in the description_
-
h2. CreateSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
|data|bytes|data for the node|
|mode|CreateMode|PERSISTENT, PERSISTENT\_SEQUENTIAL, EPHEMERAL, or EPHEMERAL\_SEQUENTIAL|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
@@ -63,7 +61,7 @@ h2. CreateSpec
h2. DeleteSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
|guaranteed|bool|if true, use guaranteed deletion|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
|compressed|bool|if true, compress the data|
@@ -72,7 +70,7 @@ h2. DeleteSpec
h2. GetDataSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
|watched|bool|if true, trigger watch events for this node|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
|decompressed|bool|if true, decompress the data|
@@ -80,8 +78,8 @@ h2. GetDataSpec
h2. SetDataSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
-|data|bytes|(r) data for the node|
+|path|string|the ZNode path \- *required*|
+|data|bytes|data for the node \- *required*|
|watched|bool|if true, trigger watch events for this node|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
|compressed|bool|if true, compress the data|
@@ -90,14 +88,14 @@ h2. SetDataSpec
h2. ExistsSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
|watched|bool|if true, trigger watch events for this node|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
h2. GetChildrenSpec
||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
|watched|bool|if true, trigger watch events for this node|
|asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/usage.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/usage.confluence b/curator-x-rpc/src/site/confluence/usage.confluence
index 86befd1..e9da75f 100644
--- a/curator-x-rpc/src/site/confluence/usage.confluence
+++ b/curator-x-rpc/src/site/confluence/usage.confluence
@@ -10,7 +10,7 @@ Details on using Apache Thrift can be found here: [[http://thrift.apache.org]].
h2. Prerequisites
-It's assumed you already familiar with ZooKeeper and Curator. Also, familiarity with writing Thrift applications is helpful.
+It's assumed you are already familiar with ZooKeeper and Curator. Also, familiarity with writing Thrift applications is helpful.
h2. Services
[2/3] git commit: Return null if the connection manager is closed
Posted by ra...@apache.org.
Return null if the connection manager is closed
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30019cda
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30019cda
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30019cda
Branch: refs/heads/curator-rpc
Commit: 30019cda31afee4150bd014d849e5e7677d02909
Parents: bee07a5
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 2 11:20:28 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 2 11:20:28 2014 -0500
----------------------------------------------------------------------
.../org/apache/curator/x/rpc/connections/ConnectionManager.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/30019cda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
index afcd64e..d644231 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -118,8 +118,7 @@ public class ConnectionManager implements Closeable
public CuratorEntry get(String id)
{
- Preconditions.checkState(state.get() == State.STARTED, "Not started");
- return cache.getIfPresent(id);
+ return (state.get() == State.STARTED) ? cache.getIfPresent(id) : null;
}
public CuratorEntry remove(String id)