You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/22 14:35:16 UTC

[01/10] git commit: WIP on a REST api

Repository: curator
Updated Branches:
  refs/heads/CURATOR-70 [created] 896c7c1b3
  refs/heads/CURATOR-82 [created] 20d8c066f
  refs/heads/CURATOR-88 656ecdedc -> 710d78d48
  refs/heads/CURATOR-97 [created] bfdef2ddc
  refs/heads/CURATOR-97-OLDER-MUST-HAVE-BEEN-A-MISTAKE [created] e4cb66acb
  refs/heads/rest [created] 2fa263e94
  refs/heads/websockets [created] a08f3c55d


WIP on a REST api


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/91fb3886
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/91fb3886
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/91fb3886

Branch: refs/heads/rest
Commit: 91fb3886edba31e6077c21ca2f0073b8697d1399
Parents: 20d8c06
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 15:12:32 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 15:12:32 2014 -0500

----------------------------------------------------------------------
 curator-x-rest/pom.xml                          |  54 ++++++++
 .../org/apache/curator/x/rest/ApiResource.java  |  24 ++++
 .../curator/x/rest/ConnectionResource.java      | 113 ++++++++++++++++
 .../curator/x/rest/LeaderRecipeResource.java    | 132 +++++++++++++++++++
 .../curator/x/rest/LockRecipeResource.java      | 132 +++++++++++++++++++
 .../x/rest/entity/LockRequestEntity.java        |  60 +++++++++
 .../curator/x/rest/system/Connection.java       | 106 +++++++++++++++
 .../x/rest/system/ConnectionsManager.java       |  94 +++++++++++++
 .../rest/system/CuratorFrameworkAllocator.java  |  27 ++++
 .../apache/curator/x/rest/system/ThingKey.java  |  85 ++++++++++++
 .../apache/curator/x/rest/system/ThingType.java |  77 +++++++++++
 pom.xml                                         |   1 +
 12 files changed, 905 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rest/pom.xml b/curator-x-rest/pom.xml
new file mode 100644
index 0000000..8176fb5
--- /dev/null
+++ b/curator-x-rest/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?><!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>apache-curator</artifactId>
+        <groupId>org.apache.curator</groupId>
+        <version>2.3.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>curator-x-rest</artifactId>
+    <version>2.3.2-SNAPSHOT</version>
+
+    <properties>
+        <jersey.version>2.5.1</jersey.version>
+        <osgi.import.package>
+            *
+        </osgi.import.package>
+        <osgi.export.package>
+            org.apache.curator.x.discovery.server*;version="${project.version}";-noimport:=true
+        </osgi.export.package>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
new file mode 100644
index 0000000..db8bb82
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+public class ApiResource
+{
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
new file mode 100644
index 0000000..88fd202
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.Future;
+
+@Path("zookeeper/connection")
+public class ConnectionResource
+{
+    private final ConnectionsManager connectionsManager;
+
+    public ConnectionResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+    {
+        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+    }
+
+    @POST
+    @Path("{id}/connection-state-change")
+    public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id)
+    {
+        final Connection connection = connectionsManager.get(id);
+        if ( connection == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        Future<?> future = connectionsManager.getExecutorService().submit(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    ConnectionState state = connection.blockingPopStateChange();
+                    asyncResponse.resume(Response.ok(state.name()).build());
+                }
+                catch ( InterruptedException e )
+                {
+                    Thread.currentThread().interrupt();
+                    asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
+                }
+            }
+        });
+        connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    public Response getState(@PathParam("id") String id)
+    {
+        Connection connection = connectionsManager.get(id);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        return Response.ok(connection.getClient().getState().name()).build();
+    }
+
+    @POST
+    @Produces(MediaType.APPLICATION_JSON)
+    public String newConnection() throws Exception
+    {
+        return connectionsManager.newConnection();
+    }
+
+    @DELETE
+    @Path("{id}")
+    public Response closeConnection(@PathParam("id") String id)
+    {
+        if ( connectionsManager.closeConnection(id) )
+        {
+            return Response.ok().build();
+        }
+        return Response.status(Response.Status.NOT_FOUND).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
new file mode 100644
index 0000000..868fb33
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/leader/{connectionId}")
+public class LeaderRecipeResource
+{
+    private final ConnectionsManager connectionsManager;
+
+    public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+    {
+        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+    }
+
+    @POST
+    @Path("{path:.*}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+        connection.putThing(key, mutex);
+
+        return Response.ok(key.getId()).build();
+    }
+
+    @DELETE
+    @Path("{id}")
+    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        if ( mutex.isAcquiredInThisProcess() )
+        {
+            mutex.release();
+        }
+
+        return Response.ok().build();
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        connectionsManager.getExecutorService().submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+                        asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+                    }
+                    catch ( Exception e )
+                    {
+                        asyncResponse.resume(e);
+                    }
+                }
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
new file mode 100644
index 0000000..ed7a572
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/lock/{connectionId}")
+public class LockRecipeResource
+{
+    private final ConnectionsManager connectionsManager;
+
+    public LockRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+    {
+        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+    }
+
+    @POST
+    @Path("{path:.*}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+        connection.putThing(key, mutex);
+
+        return Response.ok(key.getId()).build();
+    }
+
+    @DELETE
+    @Path("{id}")
+    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        if ( mutex.isAcquiredInThisProcess() )
+        {
+            mutex.release();
+        }
+
+        return Response.ok().build();
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        connectionsManager.getExecutorService().submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+                        asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+                    }
+                    catch ( Exception e )
+                    {
+                        asyncResponse.resume(e);
+                    }
+                }
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
new file mode 100644
index 0000000..ef51d15
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class LockRequestEntity
+{
+    private String lockId;
+    private int maxWaitMs;
+
+    public LockRequestEntity()
+    {
+        this("/", 0);
+    }
+
+    public LockRequestEntity(String lockId, int maxWaitMs)
+    {
+        this.lockId = lockId;
+        this.maxWaitMs = maxWaitMs;
+    }
+
+    public String getLockId()
+    {
+        return lockId;
+    }
+
+    public void setLockId(String lockId)
+    {
+        this.lockId = lockId;
+    }
+
+    public int getMaxWaitMs()
+    {
+        return maxWaitMs;
+    }
+
+    public void setMaxWaitMs(int maxWaitMs)
+    {
+        this.maxWaitMs = maxWaitMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
new file mode 100644
index 0000000..7f46de1
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Connection implements Closeable, ConnectionStateListener
+{
+    private final CuratorFramework client;
+    private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis());
+    private final Map<ThingKey, Object> things = Maps.newConcurrentMap();
+    private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
+
+    public Connection(CuratorFramework client)
+    {
+        this.client = client;
+        client.getConnectionStateListenable().addListener(this);
+    }
+
+    @Override
+    public void close()
+    {
+        client.close();
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState)
+    {
+        states.add(newState);
+    }
+
+    public ConnectionState blockingPopStateChange() throws InterruptedException
+    {
+        return states.take();
+    }
+
+    public void updateUse()
+    {
+        for ( Map.Entry<ThingKey, Object> entry : things.entrySet() )
+        {
+            //noinspection unchecked
+            entry.getKey().getType().closeFor(entry.getValue());
+        }
+        lastUseMs.set(System.currentTimeMillis());
+    }
+
+    public CuratorFramework getClient()
+    {
+        return client;
+    }
+
+    public long getLastUseMs()
+    {
+        return lastUseMs.get();
+    }
+
+    public <T> void putThing(ThingKey<T> key, T thing)
+    {
+        things.put(key, thing);
+    }
+
+    public <T> T getThing(ThingKey<T> key)
+    {
+        Object o = things.get(key);
+        if ( o != null )
+        {
+            return key.getType().getThingClass().cast(o);
+        }
+        return null;
+    }
+
+    public <T> T removeThing(ThingKey<T> key)
+    {
+        Object o = things.remove(key);
+        if ( o != null )
+        {
+            return key.getType().getThingClass().cast(o);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
new file mode 100644
index 0000000..febea7a
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+// TODO connection cleanup/timeouts
+
+public class ConnectionsManager implements Closeable
+{
+    private final Map<String, Connection> connections = Maps.newConcurrentMap();
+    private final CuratorFrameworkAllocator allocator;
+    private final ExecutorService executorService;
+
+    public ConnectionsManager(CuratorFrameworkAllocator allocator)
+    {
+        this(allocator, Executors.newCachedThreadPool(ThreadUtils.newThreadFactory("ConnectionsManager")));
+    }
+
+    public ConnectionsManager(CuratorFrameworkAllocator allocator, ExecutorService executorService)
+    {
+        this.allocator = allocator;
+        this.executorService = executorService;
+    }
+
+    public String newConnection() throws Exception
+    {
+        String id = UUID.randomUUID().toString();
+        CuratorFramework client = allocator.newCuratorFramework();
+        connections.put(id, new Connection(client));
+        return id;
+    }
+
+    public Connection get(String id)
+    {
+        Connection connection = connections.get(id);
+        if ( connection != null )
+        {
+            connection.updateUse();
+        }
+        return connection;
+    }
+
+    public boolean closeConnection(String id)
+    {
+        Connection connection = connections.remove(id);
+        if ( connection != null )
+        {
+            connection.close();
+            return true;
+        }
+        return false;
+    }
+
+    public ExecutorService getExecutorService()
+    {
+        return executorService;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        for ( Connection connection : connections.values() )
+        {
+            Closeables.closeQuietly(connection);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
new file mode 100644
index 0000000..d6dd768
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public interface CuratorFrameworkAllocator
+{
+    public CuratorFramework newCuratorFramework() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
new file mode 100644
index 0000000..3e60cbd
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+
+public class ThingKey<T>
+{
+    private final String id;
+    private final ThingType<T> type;
+
+    public ThingKey(ThingType<T> type)
+    {
+        this(UUID.randomUUID().toString(), type);
+    }
+
+    public ThingKey(String id, ThingType<T> type)
+    {
+        this.id = Preconditions.checkNotNull(id, "id cannot be null");
+        this.type = Preconditions.checkNotNull(type, "type cannot be null");
+    }
+
+    public String getId()
+    {
+        return id;
+    }
+
+    public ThingType<T> getType()
+    {
+        return type;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        ThingKey thingKey = (ThingKey)o;
+
+        if ( !id.equals(thingKey.id) )
+        {
+            return false;
+        }
+        //noinspection RedundantIfStatement
+        if ( type != thingKey.type )
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = id.hashCode();
+        result = 31 * result + type.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
new file mode 100644
index 0000000..d09f0cc
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import java.util.concurrent.Future;
+
+public interface ThingType<T>
+{
+    public static ThingType<InterProcessSemaphoreMutex> MUTEX = new ThingType<InterProcessSemaphoreMutex>()
+    {
+        @Override
+        public Class<InterProcessSemaphoreMutex> getThingClass()
+        {
+            return InterProcessSemaphoreMutex.class;
+        }
+
+        @Override
+        public void closeFor(InterProcessSemaphoreMutex instance)
+        {
+            // nop
+        }
+    };
+
+    public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>()
+    {
+        @Override
+        public Class<LeaderLatch> getThingClass()
+        {
+            return LeaderLatch.class;
+        }
+
+        @Override
+        public void closeFor(LeaderLatch latch)
+        {
+            Closeables.closeQuietly(latch);
+        }
+    };
+
+    public static ThingType<Future> FUTURE = new ThingType<Future>()
+    {
+        @Override
+        public Class<Future> getThingClass()
+        {
+            return Future.class;
+        }
+
+        @Override
+        public void closeFor(Future future)
+        {
+            future.cancel(true);
+        }
+    };
+
+    public Class<T> getThingClass();
+
+    public void closeFor(T instance);
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af548c2..81a2577 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
         <module>curator-examples</module>
         <module>curator-x-discovery</module>
         <module>curator-x-discovery-server</module>
+        <module>curator-x-rest</module>
     </modules>
 
     <dependencyManagement>


[08/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/710d78d4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/710d78d4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/710d78d4

Branch: refs/heads/CURATOR-88
Commit: 710d78d48ad8ef9b7f954b2c407aaf9957d0f037
Parents: 656ecde
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 12 12:39:02 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 12 12:39:02 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rest/api/ClientResource.java      |  8 +--
 .../x/rest/api/PathChildrenCacheResource.java   |  5 --
 .../org/apache/curator/x/rest/api/Session.java  |  1 -
 .../x/rest/dropwizard/CuratorRestBundle.java    |  1 -
 .../curator/x/rest/entities/DataAndStat.java    | 62 ++++++++++++++++++++
 .../curator/x/rest/entities/NodeCacheSpec.java  |  1 -
 .../x/rest/entities/OptionalNodeData.java       |  1 -
 .../src/site/confluence/entities.confluence     |  8 +--
 8 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
index 639d865..6d1ad82 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.x.rest.CuratorRestContext;
 import org.apache.curator.x.rest.entities.CreateSpec;
+import org.apache.curator.x.rest.entities.DataAndStat;
 import org.apache.curator.x.rest.entities.DeleteSpec;
 import org.apache.curator.x.rest.entities.ExistsSpec;
 import org.apache.curator.x.rest.entities.GetChildrenSpec;
@@ -33,7 +34,6 @@ import org.apache.curator.x.rest.entities.SetDataSpec;
 import org.apache.curator.x.rest.entities.Status;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import javax.ws.rs.Consumes;
@@ -282,11 +282,7 @@ public class ClientResource
         {
             result = new String((byte[])bytes);
         }
-
-        ObjectNode node = context.getMapper().createObjectNode();
-        node.put("data", result);
-        node.putPOJO("stat", stat);
-        return Response.ok(context.getWriter().writeValueAsString(node)).build();
+        return Response.ok(new DataAndStat(result, stat)).build();
     }
 
     @POST

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
index 771eb93..f383caf 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
@@ -128,11 +128,6 @@ public class PathChildrenCacheResource
     {
         PathChildrenCache cache = Constants.getThing(context.getSession(), cacheId, PathChildrenCache.class);
         ChildData currentData = cache.getCurrentData("/" + path);
-        if ( currentData == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
         return Response.ok(Constants.toNodeData(currentData)).build();
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
index cbc2211..4640a44 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
@@ -25,7 +25,6 @@ import org.apache.curator.x.rest.entities.StatusMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
index 80f3e23..6d56d61 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
@@ -19,7 +19,6 @@
 
 package org.apache.curator.x.rest.dropwizard;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
 import io.dropwizard.ConfiguredBundle;
 import io.dropwizard.setup.Bootstrap;

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
new file mode 100644
index 0000000..63d2224
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entities;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class DataAndStat
+{
+    private String data;
+    private Stat stat;
+
+    public DataAndStat()
+    {
+        this("", new Stat());
+    }
+
+    public DataAndStat(String data, Stat stat)
+    {
+        this.data = data;
+        this.stat = stat;
+    }
+
+    public String getData()
+    {
+        return data;
+    }
+
+    public void setData(String data)
+    {
+        this.data = data;
+    }
+
+    public Stat getStat()
+    {
+        return stat;
+    }
+
+    public void setStat(Stat stat)
+    {
+        this.stat = stat;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
index 07c197f..61f4bb6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.rest.entities;
 
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import javax.xml.bind.annotation.XmlRootElement;
 
 @XmlRootElement

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
index 398308c..b3eb335 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
@@ -19,7 +19,6 @@
 
 package org.apache.curator.x.rest.entities;
 
-import org.apache.zookeeper.data.Stat;
 import javax.xml.bind.annotation.XmlRootElement;
 
 @XmlRootElement

http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/site/confluence/entities.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/site/confluence/entities.confluence b/curator-x-rest/src/site/confluence/entities.confluence
index b6624c7..31be87f 100644
--- a/curator-x-rest/src/site/confluence/entities.confluence
+++ b/curator-x-rest/src/site/confluence/entities.confluence
@@ -5,17 +5,17 @@ h1. Entity Descriptions
 Here are the entity descriptions for the entities used in the APIs:
 
 ||Field||Type||Description||
-|*Status*| | |
+| *Status*| | |
 |state|string|This instance's Curator connection state. One of: "connected", "suspended", or "lost". If the state is other than "connected" you must assume that any open locks and/or watchers are no longer valid.|
 |messages|array of StatusMessage|Any pending messages from this instance.|
 | | | |
-|*StatusMessage*| | |
+| *StatusMessage*| | |
 |type|string|The status message type. See the [[Managing Status|client.html]] section for details.|
 |message|string|Type-dependent message|
 |details|string|Type-dependent details|
 |sourceId|string|Type-dependent sourceId|
 | | | |
-|*GetChildrenSpec*| | |
+| *GetChildrenSpec*| | |
 |path|string|The ZK path|
 |async|boolean|If true, perform asynchronously|
 |asyncId|string|for async, a user-defined ID to return in the status message|
@@ -23,7 +23,7 @@ Here are the entity descriptions for the entities used in the APIs:
 |watched|boolean|if true, set a watch|
 |watchId|string|if watched, a user-defined ID to return in the status when the watch triggers|
 | | | |
-|*CreateSpec*| | |
+| *CreateSpec*| | |
 |path|string|The ZK path|
 |data|string|The data to store in the node|
 |mode|string|The create mode. One of: "persistent", "persistent\_sequential", "ephemeral", or "ephemeral\_sequential"|


[09/10] git commit: some tests to help with CURATOR-97

Posted by ra...@apache.org.
some tests to help with CURATOR-97


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e4cb66ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e4cb66ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e4cb66ac

Branch: refs/heads/CURATOR-97-OLDER-MUST-HAVE-BEEN-A-MISTAKE
Commit: e4cb66acb9112a4291e593b2825744ef4a258d40
Parents: b34aaaa
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 19 13:20:42 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 19 13:20:42 2014 -0500

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java  |  4 +-
 .../nodes/TestPersistentEphemeralNode.java      | 54 ++++++++++++++++++--
 2 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e4cb66ac/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 71095ba..1a4d38d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.nodes;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
@@ -50,7 +51,8 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class PersistentEphemeralNode implements Closeable
 {
-    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+    @VisibleForTesting
+    final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;

http://git-wip-us.apache.org/repos/asf/curator/blob/e4cb66ac/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index e312081..5178fe5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.nodes;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -30,7 +29,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
-import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -72,10 +71,57 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
     }
 
     @Test
-    public void testListenersReconnectedIsFast() throws Exception
+    public void testRetriesExpireAndReconnect() throws Exception
     {
-        System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
+        Timing timing = new Timing();
+        PersistentEphemeralNode node = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node.start();
+            Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+
+            final CountDownLatch lostLatch = new CountDownLatch(1);
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    if ( newState == ConnectionState.LOST )
+                    {
+                        lostLatch.countDown();
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
+            server.stop();
+
+            Assert.assertTrue(timing.awaitLatch(lostLatch));
+            CountDownLatch recreateLatch = new CountDownLatch(1);
+            node.initialCreateLatch.set(recreateLatch);
+
+            TimeUnit.MILLISECONDS.sleep(2 * timing.session());  // make sure session expires
+
+            server = new TestingServer(server.getPort(), server.getTempDirectory());
+            timing.sleepABit();
+            KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+            timing.sleepABit();
+            Assert.assertTrue(timing.awaitLatch(recreateLatch));
+            Assert.assertNotNull(client.checkExists().forPath("/abc/node"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(node);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 
+    @Test
+    public void testListenersReconnectedIsFast() throws Exception
+    {
         server.close();
 
         Timing timing = new Timing();


[10/10] git commit: For added safety, install a connection state listener and attempt to recreate the node on RECONNECT

Posted by ra...@apache.org.
For added safety, install a connection state listener and attempt to recreate the node on RECONNECT


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdef2dd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdef2dd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdef2dd

Branch: refs/heads/CURATOR-97
Commit: bfdef2ddc1c3f2121256045690b39baafa269151
Parents: b17e6ff
Author: randgalt <ra...@apache.org>
Authored: Sat Mar 22 08:34:41 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Mar 22 08:34:41 2014 -0500

----------------------------------------------------------------------
 .../recipes/nodes/PersistentEphemeralNode.java      | 16 ++++++++++++++++
 .../framework/client/TestBackgroundStates.java      |  2 --
 .../recipes/nodes/TestPersistentEphemeralNode.java  |  2 --
 3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 71095ba..6b6ae52 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -25,6 +25,8 @@ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CreateModable;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -79,6 +81,17 @@ public class PersistentEphemeralNode implements Closeable
             }
         }
     };
+    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+    {
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState newState)
+        {
+            if ( newState == ConnectionState.RECONNECTED )
+            {
+                createNode();
+            }
+        }
+    };
 
     private enum State
     {
@@ -226,6 +239,7 @@ public class PersistentEphemeralNode implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
 
+        client.getConnectionStateListenable().addListener(connectionStateListener);
         createNode();
     }
 
@@ -254,6 +268,8 @@ public class PersistentEphemeralNode implements Closeable
             return;
         }
 
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
+
         try
         {
             deleteNode();

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
index b1c382f..cb186fc 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
@@ -97,8 +97,6 @@ public class TestBackgroundStates extends BaseClassForTests
     @Test
     public void testConnectionStateListener() throws Exception
     {
-        System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
-
         server.close();
 
         Timing timing = new Timing();

http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index e312081..4ca487a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -74,8 +74,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
     @Test
     public void testListenersReconnectedIsFast() throws Exception
     {
-        System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
-
         server.close();
 
         Timing timing = new Timing();


[03/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/dd1fe969
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/dd1fe969
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/dd1fe969

Branch: refs/heads/rest
Commit: dd1fe969f725fa84317789e08e3228131105c0f6
Parents: d30a266
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 16:40:25 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 16:40:25 2014 -0500

----------------------------------------------------------------------
 .../recipes/cache/PathChildrenCache.java        |  11 ++
 .../curator/x/rest/ConnectionResource.java      |   2 +-
 .../curator/x/rest/PathCacheRecipeResource.java | 118 ++++++++++---
 .../x/rest/entity/PathChildrenCacheEntity.java  |  84 ++++++++++
 .../entity/PathChildrenCacheEventEntity.java    |  84 ++++++++++
 .../curator/x/rest/entity/StatEntity.java       | 167 +++++++++++++++++++
 .../x/rest/system/PathChildrenCacheThing.java   |  59 +++++++
 .../apache/curator/x/rest/system/ThingType.java |  11 +-
 8 files changed, 503 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index d66f7f3..a1844db 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -170,6 +170,17 @@ public class PathChildrenCache implements Closeable
     }
 
     /**
+     * @param client    the client
+     * @param path      path to watch
+     * @param dataIsCompressed if true, data in the path is compressed
+     * @param cacheData if true, node contents are cached in addition to the stat
+     */
+    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed)
+    {
+        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+    }
+
+    /**
      * @param client        the client
      * @param path          path to watch
      * @param cacheData     if true, node contents are cached in addition to the stat

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 6d3b559..4f70111 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -48,7 +48,7 @@ public class ConnectionResource
         connectionsManager = contextResolver.getContext(ConnectionsManager.class);
     }
 
-    @POST
+    @GET
     @Path("{id}/block-on-state-change")
     public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
     {

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index 1fc7b3d..d953bd6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -19,14 +19,22 @@
 
 package org.apache.curator.x.rest;
 
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
+import org.apache.curator.x.rest.entity.StatEntity;
 import org.apache.curator.x.rest.system.Connection;
 import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.PathChildrenCacheThing;
 import org.apache.curator.x.rest.system.ThingKey;
 import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -34,10 +42,13 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Context;
+import javax.ws.rs.core.GenericEntity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
 
 @Path("zookeeper/recipes/path-cache/{connectionId}")
 public class PathCacheRecipeResource
@@ -50,9 +61,8 @@ public class PathCacheRecipeResource
     }
 
     @POST
-    @Path("{path:.*}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+    public Response allocate(@PathParam("connectionId") String connectionId, PathChildrenCacheEntity spec) throws Exception
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -60,16 +70,22 @@ public class PathCacheRecipeResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
-        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
-        connection.putThing(key, mutex);
+        PathChildrenCache cache = new PathChildrenCache(connection.getClient(), spec.getPath(), spec.isCacheData(), spec.isDataIsCompressed());
+        PathChildrenCacheThing cacheThing = new PathChildrenCacheThing(cache);
+        cache.getListenable().addListener(new LocalListener(cacheThing));
+
+        ThingKey<PathChildrenCacheThing> key = new ThingKey<PathChildrenCacheThing>(ThingType.PATH_CACHE);
+        connection.putThing(key, cacheThing);
+
+        PathChildrenCache.StartMode startMode = spec.isBuildInitial() ? PathChildrenCache.StartMode.POST_INITIALIZED_EVENT : PathChildrenCache.StartMode.NORMAL;
+        cache.start(startMode);
 
         return Response.ok(key.getId()).build();
     }
 
     @DELETE
     @Path("{id}")
-    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+    public Response delete(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId) throws IOException
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -77,23 +93,20 @@ public class PathCacheRecipeResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
-        if ( mutex == null )
+        PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+        if ( cacheThing == null )
         {
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        if ( mutex.isAcquiredInThisProcess() )
-        {
-            mutex.release();
-        }
+        cacheThing.getCache().close();
 
         return Response.ok().build();
     }
 
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+    @GET
+    @Path("{id}/block-on-events")
+    public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
     {
         Connection connection = connectionsManager.get(connectionId);
         if ( connection == null )
@@ -102,14 +115,14 @@ public class PathCacheRecipeResource
             return;
         }
 
-        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
-        if ( mutex == null )
+        final PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+        if ( cacheThing == null )
         {
             asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
             return;
         }
 
-        connectionsManager.getExecutorService().submit
+        Future<?> future = connectionsManager.getExecutorService().submit
         (
             new Runnable()
             {
@@ -118,15 +131,68 @@ public class PathCacheRecipeResource
                 {
                     try
                     {
-                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
-                        asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+                        List<PathChildrenCacheEvent> events = cacheThing.blockForPendingEvents();
+                        List<PathChildrenCacheEventEntity> transformed = Lists.transform(events, toEntity);
+                        GenericEntity<List<PathChildrenCacheEventEntity>> entity = new GenericEntity<List<PathChildrenCacheEventEntity>>(transformed){};
+                        asyncResponse.resume(Response.ok(entity).build());
                     }
-                    catch ( Exception e )
+                    catch ( InterruptedException e )
                     {
-                        asyncResponse.resume(e);
+                        Thread.currentThread().interrupt();
+                        asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
                     }
                 }
             }
         );
+        connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+    }
+
+    private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity> toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
+    {
+        @Override
+        public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
+        {
+            String path = (event.getData() != null) ? event.getData().getPath() : null;
+            String data = ((event.getData() != null) && (event.getData().getData() != null)) ? new String((event.getData().getData())) : null;
+            StatEntity stat = ((event.getData() != null) && (event.getData().getStat() != null))
+                ? new StatEntity
+                (
+                    event.getData().getStat().getCzxid(),
+                    event.getData().getStat().getMzxid(),
+                    event.getData().getStat().getCtime(),
+                    event.getData().getStat().getMtime(),
+                    event.getData().getStat().getVersion(),
+                    event.getData().getStat().getCversion(),
+                    event.getData().getStat().getAversion(),
+                    event.getData().getStat().getEphemeralOwner(),
+                    event.getData().getStat().getDataLength(),
+                    event.getData().getStat().getNumChildren(),
+                    event.getData().getStat().getPzxid()
+                )
+                : null;
+            return new PathChildrenCacheEventEntity
+            (
+                event.getType().name(),
+                path,
+                data,
+                stat
+            );
+        }
+    };
+
+    private static class LocalListener implements PathChildrenCacheListener
+    {
+        private final PathChildrenCacheThing cacheThing;
+
+        public LocalListener(PathChildrenCacheThing cacheThing)
+        {
+            this.cacheThing = cacheThing;
+        }
+
+        @Override
+        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+        {
+            cacheThing.addEvent(event);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
new file mode 100644
index 0000000..c2c3328
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEntity
+{
+    private String path;
+    private boolean dataIsCompressed;
+    private boolean cacheData;
+    private boolean buildInitial;
+
+    public PathChildrenCacheEntity()
+    {
+        this("", false, false, false);
+    }
+
+    public PathChildrenCacheEntity(String path, boolean dataIsCompressed, boolean cacheData, boolean buildInitial)
+    {
+        this.path = path;
+        this.dataIsCompressed = dataIsCompressed;
+        this.cacheData = cacheData;
+        this.buildInitial = buildInitial;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public void setPath(String path)
+    {
+        this.path = path;
+    }
+
+    public boolean isDataIsCompressed()
+    {
+        return dataIsCompressed;
+    }
+
+    public void setDataIsCompressed(boolean dataIsCompressed)
+    {
+        this.dataIsCompressed = dataIsCompressed;
+    }
+
+    public boolean isCacheData()
+    {
+        return cacheData;
+    }
+
+    public void setCacheData(boolean cacheData)
+    {
+        this.cacheData = cacheData;
+    }
+
+    public boolean isBuildInitial()
+    {
+        return buildInitial;
+    }
+
+    public void setBuildInitial(boolean buildInitial)
+    {
+        this.buildInitial = buildInitial;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
new file mode 100644
index 0000000..9b08d32
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEventEntity
+{
+    private String eventType;
+    private String path;
+    private String data;
+    private StatEntity stat;
+
+    public PathChildrenCacheEventEntity()
+    {
+        this("", "", "", new StatEntity());
+    }
+
+    public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity stat)
+    {
+        this.eventType = eventType;
+        this.path = path;
+        this.data = data;
+        this.stat = stat;
+    }
+
+    public String getEventType()
+    {
+        return eventType;
+    }
+
+    public void setEventType(String eventType)
+    {
+        this.eventType = eventType;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public void setPath(String path)
+    {
+        this.path = path;
+    }
+
+    public String getData()
+    {
+        return data;
+    }
+
+    public void setData(String data)
+    {
+        this.data = data;
+    }
+
+    public StatEntity getStat()
+    {
+        return stat;
+    }
+
+    public void setStat(StatEntity stat)
+    {
+        this.stat = stat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
new file mode 100644
index 0000000..ef323ec
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class StatEntity
+{
+    private long czxid;
+    private long mzxid;
+    private long ctime;
+    private long mtime;
+    private int version;
+    private int cversion;
+    private int aversion;
+    private long ephemeralOwner;
+    private int dataLength;
+    private int numChildren;
+    private long pzxid;
+
+    public StatEntity()
+    {
+    }
+
+    public StatEntity(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid)
+    {
+        this.czxid = czxid;
+        this.mzxid = mzxid;
+        this.ctime = ctime;
+        this.mtime = mtime;
+        this.version = version;
+        this.cversion = cversion;
+        this.aversion = aversion;
+        this.ephemeralOwner = ephemeralOwner;
+        this.dataLength = dataLength;
+        this.numChildren = numChildren;
+        this.pzxid = pzxid;
+    }
+
+    public long getCzxid()
+    {
+        return czxid;
+    }
+
+    public void setCzxid(long czxid)
+    {
+        this.czxid = czxid;
+    }
+
+    public long getMzxid()
+    {
+        return mzxid;
+    }
+
+    public void setMzxid(long mzxid)
+    {
+        this.mzxid = mzxid;
+    }
+
+    public long getCtime()
+    {
+        return ctime;
+    }
+
+    public void setCtime(long ctime)
+    {
+        this.ctime = ctime;
+    }
+
+    public long getMtime()
+    {
+        return mtime;
+    }
+
+    public void setMtime(long mtime)
+    {
+        this.mtime = mtime;
+    }
+
+    public int getVersion()
+    {
+        return version;
+    }
+
+    public void setVersion(int version)
+    {
+        this.version = version;
+    }
+
+    public int getCversion()
+    {
+        return cversion;
+    }
+
+    public void setCversion(int cversion)
+    {
+        this.cversion = cversion;
+    }
+
+    public int getAversion()
+    {
+        return aversion;
+    }
+
+    public void setAversion(int aversion)
+    {
+        this.aversion = aversion;
+    }
+
+    public long getEphemeralOwner()
+    {
+        return ephemeralOwner;
+    }
+
+    public void setEphemeralOwner(long ephemeralOwner)
+    {
+        this.ephemeralOwner = ephemeralOwner;
+    }
+
+    public int getDataLength()
+    {
+        return dataLength;
+    }
+
+    public void setDataLength(int dataLength)
+    {
+        this.dataLength = dataLength;
+    }
+
+    public int getNumChildren()
+    {
+        return numChildren;
+    }
+
+    public void setNumChildren(int numChildren)
+    {
+        this.numChildren = numChildren;
+    }
+
+    public long getPzxid()
+    {
+        return pzxid;
+    }
+
+    public void setPzxid(long pzxid)
+    {
+        this.pzxid = pzxid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
new file mode 100644
index 0000000..1c507aa
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import java.util.List;
+
+public class PathChildrenCacheThing
+{
+    private final PathChildrenCache cache;
+    private final List<PathChildrenCacheEvent> events = Lists.newLinkedList();
+
+    public PathChildrenCacheThing(PathChildrenCache cache)
+    {
+        this.cache = cache;
+    }
+
+    public PathChildrenCache getCache()
+    {
+        return cache;
+    }
+
+    public synchronized List<PathChildrenCacheEvent> blockForPendingEvents() throws InterruptedException
+    {
+        while ( events.size() == 0 )
+        {
+            wait();
+        }
+
+        List<PathChildrenCacheEvent> localEvents = Lists.newArrayList(events);
+        events.clear();
+        return localEvents;
+    }
+
+    public synchronized void addEvent(PathChildrenCacheEvent event)
+    {
+        events.add(event);
+        notifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index 13448dd..81251fc 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,6 @@
 package org.apache.curator.x.rest.system;
 
 import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import java.util.concurrent.Future;
 
@@ -41,18 +40,18 @@ public interface ThingType<T>
         }
     };
 
-    public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
+    public static ThingType<PathChildrenCacheThing> PATH_CACHE = new ThingType<PathChildrenCacheThing>()
     {
         @Override
-        public Class<PathChildrenCache> getThingClass()
+        public Class<PathChildrenCacheThing> getThingClass()
         {
-            return PathChildrenCache.class;
+            return PathChildrenCacheThing.class;
         }
 
         @Override
-        public void closeFor(PathChildrenCache cache)
+        public void closeFor(PathChildrenCacheThing cache)
         {
-            Closeables.closeQuietly(cache);
+            Closeables.closeQuietly(cache.getCache());
         }
     };
 


[07/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a08f3c55
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a08f3c55
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a08f3c55

Branch: refs/heads/websockets
Commit: a08f3c55db4c91e4baf809c19f5c9da23e648360
Parents: f68a785
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 13:18:36 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 13:18:36 2014 -0500

----------------------------------------------------------------------
 .../curator/x/websockets/api/zookeeper/Create.java | 17 +++++++++++++++--
 .../x/websockets/details/CuratorEndpoint.java      |  1 +
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/a08f3c55/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 24cb076..eba903e 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,13 @@
 
 package org.apache.curator.x.websockets.api.zookeeper;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathAndBytesable;
 import org.apache.curator.framework.api.Compressible;
 import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.x.websockets.api.ApiCommand;
 import org.apache.curator.x.websockets.api.JsonUtils;
@@ -33,6 +37,7 @@ import org.codehaus.jackson.map.ObjectWriter;
 
 public class Create implements ApiCommand
 {
+    @SuppressWarnings("unchecked")
     @Override
     public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
     {
@@ -67,14 +72,22 @@ public class Create implements ApiCommand
                 builder = ((CreateModable)builder).withMode(createMode);
             }
 
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {
+                    System.out.println();
+                }
+            };
             if ( payload != null )
             {
                 String payloadStr = writer.writeValueAsString(payload);
-                result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+                result = ((BackgroundPathAndBytesable<String>)builder).inBackground(callback).forPath(path, payloadStr.getBytes());
             }
             else
             {
-                result = ((PathAndBytesable)builder).forPath(path);
+                result = ((BackgroundPathAndBytesable<String>)builder).inBackground(callback).forPath(path);
             }
         }
         catch ( ClassCastException e )

http://git-wip-us.apache.org/repos/asf/curator/blob/a08f3c55/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 6f664de..adf6dfb 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -81,6 +81,7 @@ public class CuratorEndpoint extends Endpoint
         catch ( Exception e )
         {
             // TODO
+            e.printStackTrace();
         }
 
         MessageHandler handler = new MessageHandler.Whole<String>()


[05/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0910d482
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0910d482
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0910d482

Branch: refs/heads/websockets
Commit: 0910d4821f09bf1d965c8547ef82b1516976a6b8
Parents: 20d8c06
Author: randgalt <ra...@apache.org>
Authored: Fri Jan 10 18:44:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jan 10 18:44:13 2014 -0500

----------------------------------------------------------------------
 curator-x-websockets/pom.xml                    |  71 ++++++++++++
 .../curator/x/websockets/ClientCreator.java     |  27 +++++
 .../x/websockets/CuratorWebsocketsConfig.java   |  68 ++++++++++++
 .../x/websockets/CuratorWebsocketsServer.java   |  77 +++++++++++++
 .../x/websockets/DefaultClientCreator.java      |  45 ++++++++
 .../curator/x/websockets/api/ApiCommand.java    |  30 ++++++
 .../x/websockets/api/CommandManager.java        |  49 +++++++++
 .../curator/x/websockets/api/JsonUtils.java     |  56 ++++++++++
 .../x/websockets/api/zookeeper/Create.java      |  46 ++++++++
 .../x/websockets/details/CuratorEndpoint.java   | 107 +++++++++++++++++++
 .../details/CuratorWebsocketsSession.java       |  53 +++++++++
 .../x/websockets/details/SessionManager.java    |  64 +++++++++++
 .../apache/curator/x/websockets/TestServer.java |  30 ++++++
 pom.xml                                         |   1 +
 14 files changed, 724 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-websockets/pom.xml b/curator-x-websockets/pom.xml
new file mode 100644
index 0000000..95803c1
--- /dev/null
+++ b/curator-x-websockets/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?><!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>apache-curator</artifactId>
+        <groupId>org.apache.curator</groupId>
+        <version>2.3.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>curator-x-websockets</artifactId>
+    <version>2.3.2-SNAPSHOT</version>
+
+    <properties>
+        <tyrus.version>1.3.3</tyrus.version>
+
+        <osgi.import.package>
+            *
+        </osgi.import.package>
+        <osgi.export.package>
+            org.apache.curator.x.websockets*;version="${project.version}";-noimport:=true
+        </osgi.export.package>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.tyrus</groupId>
+            <artifactId>tyrus-server</artifactId>
+            <version>${tyrus.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.glassfish.tyrus</groupId>
+            <artifactId>tyrus-container-grizzly-server</artifactId>
+            <version>${tyrus.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.2</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
new file mode 100644
index 0000000..65b8fe0
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public interface ClientCreator
+{
+    public CuratorFramework newClient() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
new file mode 100644
index 0000000..c7227c4
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+public class CuratorWebsocketsConfig
+{
+    private String bindHost = "localhost";
+    private int port = 8080;
+    private String rootPath = "/websockets";
+    private String websocketPath = "/curator";
+
+    public String getBindHost()
+    {
+        return bindHost;
+    }
+
+    public void setBindHost(String bindHost)
+    {
+        this.bindHost = bindHost;
+    }
+
+    public int getPort()
+    {
+        return port;
+    }
+
+    public void setPort(int port)
+    {
+        this.port = port;
+    }
+
+    public String getRootPath()
+    {
+        return rootPath;
+    }
+
+    public void setRootPath(String rootPath)
+    {
+        this.rootPath = rootPath;
+    }
+
+    public String getWebsocketPath()
+    {
+        return websocketPath;
+    }
+
+    public void setWebsocketPath(String websocketPath)
+    {
+        this.websocketPath = websocketPath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
new file mode 100644
index 0000000..3bc8ff2
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.x.websockets.api.CommandManager;
+import org.apache.curator.x.websockets.details.CuratorEndpoint;
+import org.apache.curator.x.websockets.details.SessionManager;
+import org.glassfish.tyrus.spi.ServerContainer;
+import org.glassfish.tyrus.spi.ServerContainerFactory;
+import javax.websocket.server.ServerEndpointConfig;
+import java.io.Closeable;
+import java.util.List;
+
+public class CuratorWebsocketsServer implements Closeable
+{
+    private final ServerContainer serverContainer;
+    private final String rootPath;
+    private final int port;
+    private final CommandManager commandManager = new CommandManager();
+
+    public CuratorWebsocketsServer(CuratorWebsocketsConfig config, ClientCreator clientCreator) throws Exception
+    {
+        rootPath = config.getRootPath();
+        port = config.getPort();
+
+        serverContainer = ServerContainerFactory.createServerContainer(null);
+
+        final SessionManager sessionManager = new SessionManager(clientCreator, commandManager);
+        ServerEndpointConfig.Configurator configurator = new ServerEndpointConfig.Configurator()
+        {
+            @Override
+            public String getNegotiatedSubprotocol(List<String> supported, List<String> requested)
+            {
+                return super.getNegotiatedSubprotocol(supported, requested);
+            }
+
+            @Override
+            public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException
+            {
+                Preconditions.checkArgument(endpointClass.equals(CuratorEndpoint.class), "Expected CuratorEndpoint: " + endpointClass.getName());
+                //noinspection unchecked
+                return (T)new CuratorEndpoint(sessionManager);
+            }
+        };
+        ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(CuratorEndpoint.class, config.getWebsocketPath()).configurator(configurator).build();
+        serverContainer.addEndpoint(serverEndpointConfig);
+    }
+
+    public void start() throws Exception
+    {
+        serverContainer.start(rootPath, port);
+    }
+
+    @Override
+    public void close()
+    {
+        serverContainer.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
new file mode 100644
index 0000000..619445c
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+public class DefaultClientCreator implements ClientCreator
+{
+    private final ExponentialBackoffRetry retryPolicy;
+
+    public DefaultClientCreator()
+    {
+        this(new ExponentialBackoffRetry(100, 3));
+    }
+
+    public DefaultClientCreator(ExponentialBackoffRetry retryPolicy)
+    {
+        this.retryPolicy = retryPolicy;
+    }
+
+    @Override
+    public CuratorFramework newClient() throws Exception
+    {
+        return CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
new file mode 100644
index 0000000..438e1a6
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public interface ApiCommand
+{
+    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
new file mode 100644
index 0000000..c16e927
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.x.websockets.api.zookeeper.Create;
+import java.io.FileNotFoundException;
+import java.util.Map;
+
+public class CommandManager
+{
+    private final Map<String, Class<? extends ApiCommand>> commands;
+
+    public CommandManager()
+    {
+        ImmutableMap.Builder<String, Class<? extends ApiCommand>> builder = ImmutableMap.builder();
+
+        builder.put("zookeeper/create", Create.class);
+
+        commands = builder.build();
+    }
+
+    public ApiCommand newCommand(String name) throws Exception
+    {
+        Class<? extends ApiCommand> clazz = commands.get(name);
+        if ( clazz == null )
+        {
+            throw new FileNotFoundException(name);
+        }
+        return clazz.newInstance();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
new file mode 100644
index 0000000..8c18d7c
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import org.codehaus.jackson.JsonNode;
+
+public class JsonUtils
+{
+    public static String getRequiredString(JsonNode node, String name) throws Exception
+    {
+        JsonNode jsonNode = node.get(name);
+        if ( jsonNode == null )
+        {
+            throw new Exception("Required field is missing: " + name);
+        }
+        return jsonNode.asText();
+    }
+
+    public static String getOptionalString(JsonNode node, String name)
+    {
+        return getOptionalString(node, name, null);
+    }
+
+    public static String getOptionalString(JsonNode node, String name, String defaultValue)
+    {
+        JsonNode jsonNode = node.get(name);
+        return (jsonNode != null) ? jsonNode.asText() : defaultValue;
+    }
+
+    public static boolean getOptionalBoolean(JsonNode node, String name)
+    {
+        JsonNode jsonNode = node.get(name);
+        return (jsonNode != null) && jsonNode.asBoolean();
+    }
+
+    private JsonUtils()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
new file mode 100644
index 0000000..3cd148d
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api.zookeeper;
+
+import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
+import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public class Create implements ApiCommand
+{
+    @Override
+    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+    {
+        String path = JsonUtils.getRequiredString(input, "path");
+        boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
+        boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
+        boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
+        boolean async = JsonUtils.getOptionalBoolean(input, "async");
+        String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+
+        JsonNode payload = input.get("payload");
+
+        // TODO ACL
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
new file mode 100644
index 0000000..576f475
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.websockets.api.ApiCommand;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+import javax.websocket.CloseReason;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+import java.io.IOException;
+
+public class CuratorEndpoint extends Endpoint
+{
+    private final SessionManager sessionManager;
+    private final ObjectReader reader = new ObjectMapper().reader();
+    private final ObjectWriter writer = new ObjectMapper().writer();
+
+    public CuratorEndpoint(SessionManager sessionManager)
+    {
+        this.sessionManager = sessionManager;
+    }
+
+    @Override
+    public void onOpen(final Session session, EndpointConfig config)
+    {
+        try
+        {
+            CuratorFramework client = sessionManager.getClientCreator().newClient();
+            sessionManager.put(session, new CuratorWebsocketsSession(client, session));
+
+            client.start();
+        }
+        catch ( Exception e )
+        {
+            // TODO
+        }
+
+        MessageHandler handler = new MessageHandler.Whole<String>()
+        {
+            @Override
+            public void onMessage(String message)
+            {
+                processMessage(session, message);
+            }
+        };
+        session.addMessageHandler(handler);
+    }
+
+    @Override
+    public void onClose(Session session, CloseReason closeReason)
+    {
+        CuratorWebsocketsSession curatorWebsocketsSession = sessionManager.remove(session);
+        if ( curatorWebsocketsSession != null )
+        {
+            curatorWebsocketsSession.close();
+        }
+    }
+
+    private void processMessage(Session session, String message)
+    {
+        try
+        {
+            CuratorWebsocketsSession curatorWebsocketsSession = sessionManager.get(session);
+            if ( curatorWebsocketsSession == null )
+            {
+                throw new Exception("No session found for sessionId: " + session.getId());
+            }
+
+            JsonNode jsonNode = reader.readTree(message);
+            JsonNode command = jsonNode.get("command");
+            if ( command == null )
+            {
+                throw new Exception("Missing field: \"command\"");
+            }
+            String commandName = command.asText();
+            ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
+            apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+        }
+        catch ( Exception e )
+        {
+            // TODO
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
new file mode 100644
index 0000000..9df8b68
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import javax.websocket.Session;
+import java.io.Closeable;
+
+public class CuratorWebsocketsSession implements Closeable
+{
+    private final CuratorFramework client;
+    private final Session session;
+
+    public CuratorWebsocketsSession(CuratorFramework client, Session session)
+    {
+        this.client = client;
+        this.session = session;
+    }
+
+    @Override
+    public void close()
+    {
+        Closeables.closeQuietly(client);
+    }
+
+    public CuratorFramework getClient()
+    {
+        return client;
+    }
+
+    public Session getSession()
+    {
+        return session;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
new file mode 100644
index 0000000..5c37ac3
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import com.google.common.collect.Maps;
+import org.apache.curator.x.websockets.ClientCreator;
+import org.apache.curator.x.websockets.api.CommandManager;
+import javax.websocket.Session;
+import java.util.Map;
+
+public class SessionManager
+{
+    private final Map<String, CuratorWebsocketsSession> sessions = Maps.newConcurrentMap();
+    private final ClientCreator clientCreator;
+    private final CommandManager commandManager;
+
+    public SessionManager(ClientCreator clientCreator, CommandManager commandManager)
+    {
+        this.clientCreator = clientCreator;
+        this.commandManager = commandManager;
+    }
+
+    public void put(Session session, CuratorWebsocketsSession curatorWebsocketsSession)
+    {
+        sessions.put(session.getId(), curatorWebsocketsSession);
+    }
+
+    public CuratorWebsocketsSession get(Session session)
+    {
+        return sessions.get(session.getId());
+    }
+
+    public CuratorWebsocketsSession remove(Session session)
+    {
+        return sessions.remove(session.getId());
+    }
+
+    public ClientCreator getClientCreator()
+    {
+        return clientCreator;
+    }
+
+    public CommandManager getCommandManager()
+    {
+        return commandManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java b/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
new file mode 100644
index 0000000..f9eef23
--- /dev/null
+++ b/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+public class TestServer
+{
+    public static void main(String[] args) throws Exception
+    {
+        CuratorWebsocketsServer server = new CuratorWebsocketsServer(new CuratorWebsocketsConfig(), new DefaultClientCreator());
+        server.start();
+        Thread.currentThread().join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af548c2..91e899f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
         <module>curator-examples</module>
         <module>curator-x-discovery</module>
         <module>curator-x-discovery-server</module>
+        <module>curator-x-websockets</module>
     </modules>
 
     <dependencyManagement>


[02/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d30a2664
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d30a2664
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d30a2664

Branch: refs/heads/rest
Commit: d30a2664596dbc4abc94929e3d98b3301a12b2e7
Parents: 91fb388
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 15:41:38 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 15:41:38 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rest/ConnectionResource.java      |  13 +-
 .../curator/x/rest/LeaderRecipeResource.java    | 132 -------------------
 .../curator/x/rest/PathCacheRecipeResource.java | 132 +++++++++++++++++++
 .../x/rest/entity/ConnectionStateEntity.java    |  60 +++++++++
 .../curator/x/rest/system/Connection.java       |  37 +++++-
 .../apache/curator/x/rest/system/ThingType.java |  12 +-
 6 files changed, 236 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 88fd202..6d3b559 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -19,7 +19,6 @@
 
 package org.apache.curator.x.rest;
 
-import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.rest.system.Connection;
 import org.apache.curator.x.rest.system.ConnectionsManager;
 import org.apache.curator.x.rest.system.ThingKey;
@@ -30,6 +29,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Context;
@@ -49,8 +49,8 @@ public class ConnectionResource
     }
 
     @POST
-    @Path("{id}/connection-state-change")
-    public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id)
+    @Path("{id}/block-on-state-change")
+    public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
     {
         final Connection connection = connectionsManager.get(id);
         if ( connection == null )
@@ -59,6 +59,7 @@ public class ConnectionResource
             return;
         }
 
+        final long currentStateCount = (currentStateCountArg != null) ? Long.parseLong(currentStateCountArg) : -1;
         Future<?> future = connectionsManager.getExecutorService().submit(new Runnable()
         {
             @Override
@@ -66,8 +67,8 @@ public class ConnectionResource
             {
                 try
                 {
-                    ConnectionState state = connection.blockingPopStateChange();
-                    asyncResponse.resume(Response.ok(state.name()).build());
+                    connection.blockUntilStateChange(currentStateCount);
+                    asyncResponse.resume(Response.ok().build());
                 }
                 catch ( InterruptedException e )
                 {
@@ -90,7 +91,7 @@ public class ConnectionResource
             return Response.status(Response.Status.NOT_FOUND).build();
         }
 
-        return Response.ok(connection.getClient().getState().name()).build();
+        return Response.ok(connection.getState()).build();
     }
 
     @POST

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
deleted file mode 100644
index 868fb33..0000000
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.rest;
-
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
-import org.apache.curator.x.rest.system.Connection;
-import org.apache.curator.x.rest.system.ConnectionsManager;
-import org.apache.curator.x.rest.system.ThingKey;
-import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
-
-@Path("zookeeper/recipes/leader/{connectionId}")
-public class LeaderRecipeResource
-{
-    private final ConnectionsManager connectionsManager;
-
-    public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
-    {
-        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
-    }
-
-    @POST
-    @Path("{path:.*}")
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
-        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
-        connection.putThing(key, mutex);
-
-        return Response.ok(key.getId()).build();
-    }
-
-    @DELETE
-    @Path("{id}")
-    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
-        if ( mutex == null )
-        {
-            return Response.status(Response.Status.NOT_FOUND).build();
-        }
-
-        if ( mutex.isAcquiredInThisProcess() )
-        {
-            mutex.release();
-        }
-
-        return Response.ok().build();
-    }
-
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
-    {
-        Connection connection = connectionsManager.get(connectionId);
-        if ( connection == null )
-        {
-            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
-            return;
-        }
-
-        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
-        if ( mutex == null )
-        {
-            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
-            return;
-        }
-
-        connectionsManager.getExecutorService().submit
-        (
-            new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
-                        asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
-                    }
-                    catch ( Exception e )
-                    {
-                        asyncResponse.resume(e);
-                    }
-                }
-            }
-        );
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
new file mode 100644
index 0000000..1fc7b3d
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/path-cache/{connectionId}")
+public class PathCacheRecipeResource
+{
+    private final ConnectionsManager connectionsManager;
+
+    public PathCacheRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+    {
+        connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+    }
+
+    @POST
+    @Path("{path:.*}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+        ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+        connection.putThing(key, mutex);
+
+        return Response.ok(key.getId()).build();
+    }
+
+    @DELETE
+    @Path("{id}")
+    public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        if ( mutex.isAcquiredInThisProcess() )
+        {
+            mutex.release();
+        }
+
+        return Response.ok().build();
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+        if ( mutex == null )
+        {
+            asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+            return;
+        }
+
+        connectionsManager.getExecutorService().submit
+        (
+            new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+                        asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+                    }
+                    catch ( Exception e )
+                    {
+                        asyncResponse.resume(e);
+                    }
+                }
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
new file mode 100644
index 0000000..797c26e
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class ConnectionStateEntity
+{
+    private String state;
+    private long stateCount;
+
+    public ConnectionStateEntity()
+    {
+        this("", -1);
+    }
+
+    public ConnectionStateEntity(String state, long stateCount)
+    {
+        this.state = state;
+        this.stateCount = stateCount;
+    }
+
+    public String getState()
+    {
+        return state;
+    }
+
+    public void setState(String state)
+    {
+        this.state = state;
+    }
+
+    public long getStateCount()
+    {
+        return stateCount;
+    }
+
+    public void setStateCount(long stateCount)
+    {
+        this.stateCount = stateCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
index 7f46de1..7ef36dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
@@ -19,14 +19,14 @@
 
 package org.apache.curator.x.rest.system;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.x.rest.entity.ConnectionStateEntity;
 import java.io.Closeable;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class Connection implements Closeable, ConnectionStateListener
@@ -34,7 +34,8 @@ public class Connection implements Closeable, ConnectionStateListener
     private final CuratorFramework client;
     private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis());
     private final Map<ThingKey, Object> things = Maps.newConcurrentMap();
-    private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
+    private final Object stateLock = new Object();
+    private long stateCount = 0;  // guarded by stateLock
 
     public Connection(CuratorFramework client)
     {
@@ -51,12 +52,35 @@ public class Connection implements Closeable, ConnectionStateListener
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState)
     {
-        states.add(newState);
+        synchronized(stateLock)
+        {
+            ++stateCount;
+            stateLock.notifyAll();
+        }
+    }
+
+    public ConnectionStateEntity getState()
+    {
+        synchronized(stateLock)
+        {
+            return new ConnectionStateEntity(client.getState().name(), stateCount);
+        }
     }
 
-    public ConnectionState blockingPopStateChange() throws InterruptedException
+    public void blockUntilStateChange(long expectedStateCount) throws InterruptedException
     {
-        return states.take();
+        synchronized(stateLock)
+        {
+            if ( expectedStateCount < 0 )
+            {
+                expectedStateCount = stateCount;
+            }
+
+            while ( stateCount == expectedStateCount )
+            {
+                stateLock.wait();
+            }
+        }
     }
 
     public void updateUse()
@@ -81,6 +105,7 @@ public class Connection implements Closeable, ConnectionStateListener
 
     public <T> void putThing(ThingKey<T> key, T thing)
     {
+        thing = Preconditions.checkNotNull(thing, "thing cannot be null");
         things.put(key, thing);
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index d09f0cc..13448dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,7 @@
 package org.apache.curator.x.rest.system;
 
 import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import java.util.concurrent.Future;
 
@@ -41,18 +41,18 @@ public interface ThingType<T>
         }
     };
 
-    public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>()
+    public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
     {
         @Override
-        public Class<LeaderLatch> getThingClass()
+        public Class<PathChildrenCache> getThingClass()
         {
-            return LeaderLatch.class;
+            return PathChildrenCache.class;
         }
 
         @Override
-        public void closeFor(LeaderLatch latch)
+        public void closeFor(PathChildrenCache cache)
         {
-            Closeables.closeQuietly(latch);
+            Closeables.closeQuietly(cache);
         }
     };
 


[04/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2fa263e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2fa263e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2fa263e9

Branch: refs/heads/rest
Commit: 2fa263e94044a813d6fff47268626fd0bfa69df7
Parents: dd1fe96
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 21:00:07 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 21:00:07 2014 -0500

----------------------------------------------------------------------
 .../curator/x/rest/ConnectionResource.java      |   2 +-
 .../curator/x/rest/PathCacheRecipeResource.java | 121 +++++++++++++++----
 .../entity/PathChildrenCacheDataEntity.java     |  72 +++++++++++
 .../entity/PathChildrenCacheEventEntity.java    |  34 +-----
 4 files changed, 173 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 4f70111..cc05369 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -49,7 +49,7 @@ public class ConnectionResource
     }
 
     @GET
-    @Path("{id}/block-on-state-change")
+    @Path("block-on-state-change/{id}")
     public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
     {
         final Connection connection = connectionsManager.get(id);

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index d953bd6..2c44b88 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -22,9 +22,11 @@ package org.apache.curator.x.rest;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheDataEntity;
 import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
 import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
 import org.apache.curator.x.rest.entity.StatEntity;
@@ -105,7 +107,8 @@ public class PathCacheRecipeResource
     }
 
     @GET
-    @Path("{id}/block-on-events")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("block-on-events/{id}")
     public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
     {
         Connection connection = connectionsManager.get(connectionId);
@@ -147,39 +150,105 @@ public class PathCacheRecipeResource
         connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
     }
 
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    public Response getCurrentData(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+        if ( cacheThing == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        List<ChildData> currentData = cacheThing.getCache().getCurrentData();
+        List<PathChildrenCacheDataEntity> transformed = Lists.transform(currentData, new Function<ChildData, PathChildrenCacheDataEntity>()
+        {
+            @Override
+            public PathChildrenCacheDataEntity apply(ChildData data)
+            {
+                return toEntity(data);
+            }
+        });
+
+        GenericEntity<List<PathChildrenCacheDataEntity>> entity = new GenericEntity<List<PathChildrenCacheDataEntity>>(transformed){};
+        return Response.ok(entity).build();
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/{path:.*}")
+    public Response getCurrentData(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId, @PathParam("path") String fullPath)
+    {
+        Connection connection = connectionsManager.get(connectionId);
+        if ( connection == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+        if ( cacheThing == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        ChildData currentData = cacheThing.getCache().getCurrentData(fullPath);
+        if (currentData == null )
+        {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        return Response.ok(toEntity(currentData)).build();
+    }
+
     private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity> toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
     {
         @Override
         public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
         {
-            String path = (event.getData() != null) ? event.getData().getPath() : null;
-            String data = ((event.getData() != null) && (event.getData().getData() != null)) ? new String((event.getData().getData())) : null;
-            StatEntity stat = ((event.getData() != null) && (event.getData().getStat() != null))
-                ? new StatEntity
-                (
-                    event.getData().getStat().getCzxid(),
-                    event.getData().getStat().getMzxid(),
-                    event.getData().getStat().getCtime(),
-                    event.getData().getStat().getMtime(),
-                    event.getData().getStat().getVersion(),
-                    event.getData().getStat().getCversion(),
-                    event.getData().getStat().getAversion(),
-                    event.getData().getStat().getEphemeralOwner(),
-                    event.getData().getStat().getDataLength(),
-                    event.getData().getStat().getNumChildren(),
-                    event.getData().getStat().getPzxid()
-                )
-                : null;
-            return new PathChildrenCacheEventEntity
-            (
-                event.getType().name(),
-                path,
-                data,
-                stat
-            );
+            PathChildrenCacheDataEntity data = null;
+            ChildData childData = event.getData();
+            if ( childData != null )
+            {
+                data = toEntity(childData);
+            }
+            return new PathChildrenCacheEventEntity(event.getType().name(), data);
         }
     };
 
+    private static PathChildrenCacheDataEntity toEntity(ChildData childData)
+    {
+        PathChildrenCacheDataEntity data;StatEntity stat = (childData.getStat() != null)
+            ? new StatEntity
+            (
+                childData.getStat().getCzxid(),
+                childData.getStat().getMzxid(),
+                childData.getStat().getCtime(),
+                childData.getStat().getMtime(),
+                childData.getStat().getVersion(),
+                childData.getStat().getCversion(),
+                childData.getStat().getAversion(),
+                childData.getStat().getEphemeralOwner(),
+                childData.getStat().getDataLength(),
+                childData.getStat().getNumChildren(),
+                childData.getStat().getPzxid()
+            )
+            : null;
+        data = new PathChildrenCacheDataEntity
+        (
+            childData.getPath(),
+            (childData.getData() != null) ? new String((childData.getData())) : null,
+            stat
+        );
+        return data;
+    }
+
     private static class LocalListener implements PathChildrenCacheListener
     {
         private final PathChildrenCacheThing cacheThing;

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
new file mode 100644
index 0000000..7b302ff
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheDataEntity
+{
+    private String path;
+    private String data;
+    private StatEntity stat;
+
+    public PathChildrenCacheDataEntity()
+    {
+        this("", "", new StatEntity());
+    }
+
+    public PathChildrenCacheDataEntity(String path, String data, StatEntity stat)
+    {
+        this.path = path;
+        this.data = data;
+        this.stat = stat;
+    }
+
+    public String getPath()
+    {
+        return path;
+    }
+
+    public void setPath(String path)
+    {
+        this.path = path;
+    }
+
+    public String getData()
+    {
+        return data;
+    }
+
+    public void setData(String data)
+    {
+        this.data = data;
+    }
+
+    public StatEntity getStat()
+    {
+        return stat;
+    }
+
+    public void setStat(StatEntity stat)
+    {
+        this.stat = stat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
index 9b08d32..a730d48 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -25,21 +25,17 @@ import javax.xml.bind.annotation.XmlRootElement;
 public class PathChildrenCacheEventEntity
 {
     private String eventType;
-    private String path;
-    private String data;
-    private StatEntity stat;
+    private PathChildrenCacheDataEntity data;
 
     public PathChildrenCacheEventEntity()
     {
-        this("", "", "", new StatEntity());
+        this("", new PathChildrenCacheDataEntity());
     }
 
-    public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity stat)
+    public PathChildrenCacheEventEntity(String eventType, PathChildrenCacheDataEntity data)
     {
         this.eventType = eventType;
-        this.path = path;
         this.data = data;
-        this.stat = stat;
     }
 
     public String getEventType()
@@ -52,33 +48,13 @@ public class PathChildrenCacheEventEntity
         this.eventType = eventType;
     }
 
-    public String getPath()
-    {
-        return path;
-    }
-
-    public void setPath(String path)
-    {
-        this.path = path;
-    }
-
-    public String getData()
+    public PathChildrenCacheDataEntity getData()
     {
         return data;
     }
 
-    public void setData(String data)
+    public void setData(PathChildrenCacheDataEntity data)
     {
         this.data = data;
     }
-
-    public StatEntity getStat()
-    {
-        return stat;
-    }
-
-    public void setStat(StatEntity stat)
-    {
-        this.stat = stat;
-    }
 }


[06/10] git commit: wip

Posted by ra...@apache.org.
wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f68a785c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f68a785c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f68a785c

Branch: refs/heads/websockets
Commit: f68a785c9e6b6fb90ee586716dd9e8a3dade92ff
Parents: 0910d48
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 09:50:52 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 09:50:52 2014 -0500

----------------------------------------------------------------------
 .../curator/x/websockets/api/ApiCommand.java    |  2 +-
 .../curator/x/websockets/api/JsonUtils.java     | 25 ++++++++++
 .../x/websockets/api/zookeeper/Create.java      | 51 ++++++++++++++++++--
 .../x/websockets/details/CuratorEndpoint.java   | 43 +++++++++++++----
 4 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
index 438e1a6..0e67395 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -26,5 +26,5 @@ import org.codehaus.jackson.map.ObjectWriter;
 
 public interface ApiCommand
 {
-    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+    public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
index 8c18d7c..bf659ad 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -20,9 +20,34 @@
 package org.apache.curator.x.websockets.api;
 
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
+import java.io.IOException;
+import java.util.UUID;
 
 public class JsonUtils
 {
+    public static final String FIELD_TYPE = "type";
+    public static final String FIELD_ID = "id";
+    public static final String FIELD_VALUE = "value";
+
+    public static final String SYSTEM_TYPE_CONNECTION_STATE_CHANGE = "system/connection-state-change";
+
+    public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, ObjectNode value) throws IOException
+    {
+        return newMessage(mapper, writer, type, UUID.randomUUID().toString(), value);
+    }
+
+    public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, String id, ObjectNode value) throws IOException
+    {
+        ObjectNode node = mapper.createObjectNode();
+        node.put(FIELD_TYPE, type);
+        node.put(FIELD_ID, id);
+        node.put(FIELD_VALUE, value);
+        return writer.writeValueAsString(node);
+    }
+
     public static String getRequiredString(JsonNode node, String name) throws Exception
     {
         JsonNode jsonNode = node.get(name);

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 3cd148d..24cb076 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,14 @@
 
 package org.apache.curator.x.websockets.api.zookeeper;
 
+import org.apache.curator.framework.api.Compressible;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.x.websockets.api.ApiCommand;
 import org.apache.curator.x.websockets.api.JsonUtils;
 import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.apache.zookeeper.CreateMode;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectReader;
 import org.codehaus.jackson.map.ObjectWriter;
@@ -29,18 +34,54 @@ import org.codehaus.jackson.map.ObjectWriter;
 public class Create implements ApiCommand
 {
     @Override
-    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+    public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
     {
         String path = JsonUtils.getRequiredString(input, "path");
         boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
         boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
         boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
-        boolean async = JsonUtils.getOptionalBoolean(input, "async");
-        String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+        String mode = JsonUtils.getOptionalString(input, "mode");
 
         JsonNode payload = input.get("payload");
 
-        // TODO ACL
-        return null;
+        Object builder = session.getClient().create();
+        Object result;
+        try
+        {
+            if ( withProtection )
+            {
+                builder = ((CreateBuilder)builder).withProtection();
+            }
+            if ( creatingParentsIfNeeded )
+            {
+                builder = ((CreateBuilder)builder).creatingParentsIfNeeded();
+            }
+            if ( compressed )
+            {
+                builder = ((Compressible)builder).compressed();
+            }
+
+            if ( mode != null )
+            {
+                CreateMode createMode = CreateMode.valueOf(mode.toUpperCase());
+                builder = ((CreateModable)builder).withMode(createMode);
+            }
+
+            if ( payload != null )
+            {
+                String payloadStr = writer.writeValueAsString(payload);
+                result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+            }
+            else
+            {
+                result = ((PathAndBytesable)builder).forPath(path);
+            }
+        }
+        catch ( ClassCastException e )
+        {
+            throw new Exception("Bad combination of arguments to create()");
+        }
+
+        // TODO ACL, result
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 576f475..6f664de 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -20,11 +20,15 @@
 package org.apache.curator.x.websockets.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
 import javax.websocket.CloseReason;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
@@ -35,8 +39,9 @@ import java.io.IOException;
 public class CuratorEndpoint extends Endpoint
 {
     private final SessionManager sessionManager;
-    private final ObjectReader reader = new ObjectMapper().reader();
-    private final ObjectWriter writer = new ObjectMapper().writer();
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectReader reader = mapper.reader();
+    private final ObjectWriter writer = mapper.writer();
 
     public CuratorEndpoint(SessionManager sessionManager)
     {
@@ -51,6 +56,26 @@ public class CuratorEndpoint extends Endpoint
             CuratorFramework client = sessionManager.getClientCreator().newClient();
             sessionManager.put(session, new CuratorWebsocketsSession(client, session));
 
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    try
+                    {
+                        ObjectNode node = mapper.createObjectNode();
+                        node.put("newState", newState.name());
+                        String message = JsonUtils.newMessage(mapper, writer, JsonUtils.SYSTEM_TYPE_CONNECTION_STATE_CHANGE, node);
+                        session.getAsyncRemote().sendText(message);
+                    }
+                    catch ( Exception e )
+                    {
+                        // TODO
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
             client.start();
         }
         catch ( Exception e )
@@ -90,14 +115,12 @@ public class CuratorEndpoint extends Endpoint
             }
 
             JsonNode jsonNode = reader.readTree(message);
-            JsonNode command = jsonNode.get("command");
-            if ( command == null )
-            {
-                throw new Exception("Missing field: \"command\"");
-            }
-            String commandName = command.asText();
-            ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
-            apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+            String command = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_TYPE);
+            String id = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_ID);
+            JsonNode value = jsonNode.get(JsonUtils.FIELD_VALUE);
+
+            ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(command);
+            apiCommand.process(id, value, curatorWebsocketsSession, reader, writer);
         }
         catch ( Exception e )
         {