You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/02 18:20:47 UTC

[1/3] git commit: start writing RPC tests

Repository: curator
Updated Branches:
  refs/heads/curator-rpc f3ec63cb6 -> 9077ad001


start writing RPC tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bee07a52
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bee07a52
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bee07a52

Branch: refs/heads/curator-rpc
Commit: bee07a525682436ce33e7110b8f6688ec1df285f
Parents: f3ec63c
Author: randgalt <ra...@apache.org>
Authored: Sun Jun 1 16:11:02 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Jun 1 16:11:02 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rpc/CuratorProjectionServer.java  |  14 +-
 .../java/org/apache/curator/x/rpc/RpcTests.java | 207 +++++++++++++++++++
 2 files changed, 217 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
index 806f06e..a01f462 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/CuratorProjectionServer.java
@@ -77,10 +77,7 @@ public class CuratorProjectionServer
             configurationSource = args[0];
         }
 
-        Configuration configuration = new ConfigurationBuilder(configurationSource).build();
-
-        final CuratorProjectionServer server = new CuratorProjectionServer(configuration);
-        server.start();
+        final CuratorProjectionServer server = startServer(configurationSource);
 
         Runnable shutdown = new Runnable()
         {
@@ -94,6 +91,15 @@ public class CuratorProjectionServer
         Runtime.getRuntime().addShutdownHook(hook);
     }
 
+    public static CuratorProjectionServer startServer(String configurationSource) throws Exception
+    {
+        Configuration configuration = new ConfigurationBuilder(configurationSource).build();
+
+        final CuratorProjectionServer server = new CuratorProjectionServer(configuration);
+        server.start();
+        return server;
+    }
+
     public CuratorProjectionServer(Configuration configuration)
     {
         this.configuration = configuration;

http://git-wip-us.apache.org/repos/asf/curator/blob/bee07a52/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
new file mode 100644
index 0000000..5a83b25
--- /dev/null
+++ b/curator-x-rpc/src/test/java/org/apache/curator/x/rpc/RpcTests.java
@@ -0,0 +1,207 @@
+package org.apache.curator.x.rpc;
+
+import org.apache.curator.generated.*;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.ThreadUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class RpcTests extends BaseClassForTests
+{
+    private Timing timing = new Timing();
+    private CuratorProjectionServer thriftServer;
+    private CuratorService.Client curatorServiceClient;
+    private EventService.Client eventServiceClient;
+    private int thriftPort;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        ObjectMapper mapper = new ObjectMapper();
+
+        ObjectNode connectionNode = mapper.createObjectNode();
+        connectionNode.put("name", "test");
+        connectionNode.put("connectionString", server.getConnectString());
+
+        ObjectNode thriftNode = mapper.createObjectNode();
+        thriftPort = InstanceSpec.getRandomPort();
+        thriftNode.put("port", thriftPort);
+
+        ArrayNode connections = mapper.createArrayNode();
+        connections.add(connectionNode);
+
+        ObjectNode node = mapper.createObjectNode();
+        node.put("connections", connections);
+        node.put("thrift", thriftNode);
+
+        final String configurationJson = mapper.writeValueAsString(node);
+
+        thriftServer = CuratorProjectionServer.startServer(configurationJson);
+
+        TSocket clientTransport = new TSocket("localhost", thriftPort);
+        clientTransport.setTimeout(timing.connection());
+        clientTransport.open();
+        TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+        curatorServiceClient = new CuratorService.Client(clientProtocol);
+
+        TSocket eventTransport = new TSocket("localhost", thriftPort);
+        eventTransport.setTimeout(timing.connection());
+        eventTransport.open();
+        TProtocol eventProtocol = new TBinaryProtocol(eventTransport);
+        eventServiceClient = new EventService.Client(eventProtocol);
+
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        thriftServer.stop();
+
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test");
+        CreateSpec spec = new CreateSpec();
+        spec.path = "/test";
+        spec.data = ByteBuffer.wrap("value".getBytes());
+        OptionalPath node = curatorServiceClient.createNode(curatorProjection, spec);
+        Assert.assertEquals(node.path, "/test");
+
+        GetDataSpec dataSpec = new GetDataSpec();
+        dataSpec.path = "/test";
+        OptionalData data = curatorServiceClient.getData(curatorProjection, dataSpec);
+        Assert.assertEquals(data.data, ByteBuffer.wrap("value".getBytes()));
+    }
+
+    @Test
+    public void testEvents() throws Exception
+    {
+        final CuratorProjection curatorProjection = curatorServiceClient.newCuratorProjection("test");
+
+        final CountDownLatch connectedLatch = new CountDownLatch(1);
+        final CountDownLatch nodeCreatedLatch = new CountDownLatch(1);
+        Callable<Void> proc = new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                while ( !Thread.currentThread().isInterrupted() )
+                {
+                    CuratorEvent event = eventServiceClient.getNextEvent(curatorProjection);
+                    if ( event.type == CuratorEventType.CONNECTION_CONNECTED )
+                    {
+                        connectedLatch.countDown();
+                    }
+                    else if ( event.type == CuratorEventType.WATCHED )
+                    {
+                        if ( event.watchedEvent.eventType == EventType.NodeCreated )
+                        {
+                            nodeCreatedLatch.countDown();
+                        }
+                    }
+                }
+                return null;
+            }
+        };
+        Future<Void> eventFuture = ThreadUtils.newSingleThreadExecutor("test").submit(proc);
+
+        Assert.assertTrue(timing.awaitLatch(connectedLatch));
+
+        ExistsSpec spec = new ExistsSpec();
+        spec.path = "/test";
+        spec.watched = true;
+        curatorServiceClient.exists(curatorProjection, spec);
+
+        CreateSpec createSpec = new CreateSpec();
+        createSpec.path = "/test";
+        curatorServiceClient.createNode(curatorProjection, createSpec);
+
+        Assert.assertTrue(timing.awaitLatch(nodeCreatedLatch));
+
+        eventFuture.cancel(true);
+    }
+
+    @Test
+    public void testLockMultiThread() throws Exception
+    {
+        final Timing timing = new Timing();
+
+        TSocket clientTransport = new TSocket("localhost", thriftPort);
+        clientTransport.setTimeout(timing.connection());
+        clientTransport.open();
+        TProtocol clientProtocol = new TBinaryProtocol(clientTransport);
+        final CuratorService.Client secondCuratorServiceClient = new CuratorService.Client(clientProtocol);
+        ExecutorService service = ThreadUtils.newFixedThreadPool(2, "test");
+        ExecutorCompletionService<Void> completer = new ExecutorCompletionService<Void>(service);
+
+        final CountDownLatch lockLatch = new CountDownLatch(2);
+        final AtomicBoolean hasTheLock = new AtomicBoolean();
+        for ( int i = 0; i < 2; ++i )
+        {
+            final CuratorService.Client client = (i == 0) ? curatorServiceClient : secondCuratorServiceClient;
+            Callable<Void> proc = new Callable<Void>()
+            {
+                @Override
+                public Void call() throws Exception
+                {
+                    CuratorProjection curatorProjection = client.newCuratorProjection("test");
+                    LockProjection lockProjection = client.acquireLock(curatorProjection, "/lock", timing.forWaiting().milliseconds());
+                    if ( lockProjection.id == null )
+                    {
+                        throw new Exception("Could not acquire lock");
+                    }
+                    try
+                    {
+                        if ( !hasTheLock.compareAndSet(false, true) )
+                        {
+                            throw new Exception("Two lockers");
+                        }
+
+                        timing.sleepABit();
+                    }
+                    finally
+                    {
+                        hasTheLock.set(false);
+                        lockLatch.countDown();
+                        client.closeGenericProjection(curatorProjection, lockProjection.id);
+                    }
+
+                    return null;
+                }
+            };
+            completer.submit(proc);
+        }
+
+        completer.take().get();
+        completer.take().get();
+
+        Assert.assertTrue(timing.awaitLatch(lockLatch));
+
+        service.shutdownNow();
+    }
+}
+


[3/3] git commit: doc wip

Posted by ra...@apache.org.
doc wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9077ad00
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9077ad00
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9077ad00

Branch: refs/heads/curator-rpc
Commit: 9077ad001b32868ac9abcf172438dff9ff82e52f
Parents: 30019cd
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 2 11:20:34 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 2 11:20:34 2014 -0500

----------------------------------------------------------------------
 .../src/site/confluence/configuration.confluence    | 15 +++++----------
 .../src/site/confluence/reference.confluence        | 16 +++++++---------
 curator-x-rpc/src/site/confluence/usage.confluence  |  2 +-
 3 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/configuration.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/configuration.confluence b/curator-x-rpc/src/site/confluence/configuration.confluence
index d2ae8c8..21912ae 100644
--- a/curator-x-rpc/src/site/confluence/configuration.confluence
+++ b/curator-x-rpc/src/site/confluence/configuration.confluence
@@ -124,22 +124,17 @@ h3. Retry
 
 The retry policy configuration depends on what type is used. There 3 types supported:
 
-_exponential\-backoff_
 ||Name||Type||Default Value||Description||
-|type|string|n/a|"exponential\-backoff"|
+|type|string|n/a|*exponential\-backoff*|
 |baseSleepTime|Duration|100 milliseconds|The base sleep time|
 |maxRetries|int|3|The max retries|
-
-_bounded\-exponential\-backoff_
-||Name||Type||Default Value||Description||
-|type|string|n/a|"bounded\-exponential\-backoff"|
+|\_|\_|\_|\_|
+|type|string|n/a|*bounded\-exponential\-backoff*|
 |baseSleepTime|Duration|100 milliseconds|The base sleep time|
 |maxSleepTime|Duration|30 seconds|The max sleep time|
 |maxRetries|int|3|The max retries|
-
-_ntimes_
-||Name||Type||Default Value||Description||
-|type|string|n/a|"ntimes"|
+|\_|\_|\_|\_|
+|type|string|n/a|*ntimes*|
 |sleepBetweenRetries|int|100 milliseconds|sleep time between retries|
 |n|int|3|the number of retries|
 

http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/reference.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/reference.confluence b/curator-x-rpc/src/site/confluence/reference.confluence
index 0ecb26b..dde323f 100644
--- a/curator-x-rpc/src/site/confluence/reference.confluence
+++ b/curator-x-rpc/src/site/confluence/reference.confluence
@@ -47,12 +47,10 @@ h2. DiscoveryService
 
 h1. Struct Reference
 
-_Required fields have (r) in the description_
-
 h2. CreateSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
 |data|bytes|data for the node|
 |mode|CreateMode|PERSISTENT, PERSISTENT\_SEQUENTIAL, EPHEMERAL, or EPHEMERAL\_SEQUENTIAL|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
@@ -63,7 +61,7 @@ h2. CreateSpec
 h2. DeleteSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
 |guaranteed|bool|if true, use guaranteed deletion|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
 |compressed|bool|if true, compress the data|
@@ -72,7 +70,7 @@ h2. DeleteSpec
 h2. GetDataSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
 |watched|bool|if true, trigger watch events for this node|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
 |decompressed|bool|if true, decompress the data|
@@ -80,8 +78,8 @@ h2. GetDataSpec
 h2. SetDataSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
-|data|bytes|(r) data for the node|
+|path|string|the ZNode path \- *required*|
+|data|bytes|data for the node \- *required*|
 |watched|bool|if true, trigger watch events for this node|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
 |compressed|bool|if true, compress the data|
@@ -90,14 +88,14 @@ h2. SetDataSpec
 h2. ExistsSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
 |watched|bool|if true, trigger watch events for this node|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
 
 h2. GetChildrenSpec
 
 ||Field||Type||Description||
-|path|string|(r) the ZNode path|
+|path|string|the ZNode path \- *required*|
 |watched|bool|if true, trigger watch events for this node|
 |asyncContext|string|if not null, createNode() is performed asynchronously and this is the context used in the async message|
 

http://git-wip-us.apache.org/repos/asf/curator/blob/9077ad00/curator-x-rpc/src/site/confluence/usage.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/site/confluence/usage.confluence b/curator-x-rpc/src/site/confluence/usage.confluence
index 86befd1..e9da75f 100644
--- a/curator-x-rpc/src/site/confluence/usage.confluence
+++ b/curator-x-rpc/src/site/confluence/usage.confluence
@@ -10,7 +10,7 @@ Details on using Apache Thrift can be found here: [[http://thrift.apache.org]].
 
 h2. Prerequisites
 
-It's assumed you already familiar with ZooKeeper and Curator. Also, familiarity with writing Thrift applications is helpful.
+It's assumed you are already familiar with ZooKeeper and Curator. Also, familiarity with writing Thrift applications is helpful.
 
 h2. Services
 


[2/3] git commit: Return null if the connection manager is closed

Posted by ra...@apache.org.
Return null if the connection manager is closed


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/30019cda
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/30019cda
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/30019cda

Branch: refs/heads/curator-rpc
Commit: 30019cda31afee4150bd014d849e5e7677d02909
Parents: bee07a5
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 2 11:20:28 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 2 11:20:28 2014 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/rpc/connections/ConnectionManager.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/30019cda/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
index afcd64e..d644231 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/connections/ConnectionManager.java
@@ -118,8 +118,7 @@ public class ConnectionManager implements Closeable
 
     public CuratorEntry get(String id)
     {
-        Preconditions.checkState(state.get() == State.STARTED, "Not started");
-        return cache.getIfPresent(id);
+        return (state.get() == State.STARTED) ? cache.getIfPresent(id) : null;
     }
 
     public CuratorEntry remove(String id)