You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/22 14:35:16 UTC
[01/10] git commit: WIP on a REST api
Repository: curator
Updated Branches:
refs/heads/CURATOR-70 [created] 896c7c1b3
refs/heads/CURATOR-82 [created] 20d8c066f
refs/heads/CURATOR-88 656ecdedc -> 710d78d48
refs/heads/CURATOR-97 [created] bfdef2ddc
refs/heads/CURATOR-97-OLDER-MUST-HAVE-BEEN-A-MISTAKE [created] e4cb66acb
refs/heads/rest [created] 2fa263e94
refs/heads/websockets [created] a08f3c55d
WIP on a REST api
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/91fb3886
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/91fb3886
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/91fb3886
Branch: refs/heads/rest
Commit: 91fb3886edba31e6077c21ca2f0073b8697d1399
Parents: 20d8c06
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 15:12:32 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 15:12:32 2014 -0500
----------------------------------------------------------------------
curator-x-rest/pom.xml | 54 ++++++++
.../org/apache/curator/x/rest/ApiResource.java | 24 ++++
.../curator/x/rest/ConnectionResource.java | 113 ++++++++++++++++
.../curator/x/rest/LeaderRecipeResource.java | 132 +++++++++++++++++++
.../curator/x/rest/LockRecipeResource.java | 132 +++++++++++++++++++
.../x/rest/entity/LockRequestEntity.java | 60 +++++++++
.../curator/x/rest/system/Connection.java | 106 +++++++++++++++
.../x/rest/system/ConnectionsManager.java | 94 +++++++++++++
.../rest/system/CuratorFrameworkAllocator.java | 27 ++++
.../apache/curator/x/rest/system/ThingKey.java | 85 ++++++++++++
.../apache/curator/x/rest/system/ThingType.java | 77 +++++++++++
pom.xml | 1 +
12 files changed, 905 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-rest/pom.xml b/curator-x-rest/pom.xml
new file mode 100644
index 0000000..8176fb5
--- /dev/null
+++ b/curator-x-rest/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?><!--~
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>apache-curator</artifactId>
+ <groupId>org.apache.curator</groupId>
+ <version>2.3.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>curator-x-rest</artifactId>
+ <version>2.3.2-SNAPSHOT</version>
+
+ <properties>
+ <jersey.version>2.5.1</jersey.version>
+ <osgi.import.package>
+ *
+ </osgi.import.package>
+ <osgi.export.package>
+ org.apache.curator.x.discovery.server*;version="${project.version}";-noimport:=true
+ </osgi.export.package>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
new file mode 100644
index 0000000..db8bb82
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+public class ApiResource
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
new file mode 100644
index 0000000..88fd202
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.Future;
+
+@Path("zookeeper/connection")
+public class ConnectionResource
+{
+ private final ConnectionsManager connectionsManager;
+
+ public ConnectionResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+ {
+ connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+ }
+
+ @POST
+ @Path("{id}/connection-state-change")
+ public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id)
+ {
+ final Connection connection = connectionsManager.get(id);
+ if ( connection == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ Future<?> future = connectionsManager.getExecutorService().submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ ConnectionState state = connection.blockingPopStateChange();
+ asyncResponse.resume(Response.ok(state.name()).build());
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
+ }
+ }
+ });
+ connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}")
+ public Response getState(@PathParam("id") String id)
+ {
+ Connection connection = connectionsManager.get(id);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ return Response.ok(connection.getClient().getState().name()).build();
+ }
+
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ public String newConnection() throws Exception
+ {
+ return connectionsManager.newConnection();
+ }
+
+ @DELETE
+ @Path("{id}")
+ public Response closeConnection(@PathParam("id") String id)
+ {
+ if ( connectionsManager.closeConnection(id) )
+ {
+ return Response.ok().build();
+ }
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
new file mode 100644
index 0000000..868fb33
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/leader/{connectionId}")
+public class LeaderRecipeResource
+{
+ private final ConnectionsManager connectionsManager;
+
+ public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+ {
+ connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+ }
+
+ @POST
+ @Path("{path:.*}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+ ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+ connection.putThing(key, mutex);
+
+ return Response.ok(key.getId()).build();
+ }
+
+ @DELETE
+ @Path("{id}")
+ public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ if ( mutex.isAcquiredInThisProcess() )
+ {
+ mutex.release();
+ }
+
+ return Response.ok().build();
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ connectionsManager.getExecutorService().submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+ asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+ }
+ catch ( Exception e )
+ {
+ asyncResponse.resume(e);
+ }
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
new file mode 100644
index 0000000..ed7a572
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/lock/{connectionId}")
+public class LockRecipeResource
+{
+ private final ConnectionsManager connectionsManager;
+
+ public LockRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+ {
+ connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+ }
+
+ @POST
+ @Path("{path:.*}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+ ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+ connection.putThing(key, mutex);
+
+ return Response.ok(key.getId()).build();
+ }
+
+ @DELETE
+ @Path("{id}")
+ public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ if ( mutex.isAcquiredInThisProcess() )
+ {
+ mutex.release();
+ }
+
+ return Response.ok().build();
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ connectionsManager.getExecutorService().submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+ asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+ }
+ catch ( Exception e )
+ {
+ asyncResponse.resume(e);
+ }
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
new file mode 100644
index 0000000..ef51d15
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class LockRequestEntity
+{
+ private String lockId;
+ private int maxWaitMs;
+
+ public LockRequestEntity()
+ {
+ this("/", 0);
+ }
+
+ public LockRequestEntity(String lockId, int maxWaitMs)
+ {
+ this.lockId = lockId;
+ this.maxWaitMs = maxWaitMs;
+ }
+
+ public String getLockId()
+ {
+ return lockId;
+ }
+
+ public void setLockId(String lockId)
+ {
+ this.lockId = lockId;
+ }
+
+ public int getMaxWaitMs()
+ {
+ return maxWaitMs;
+ }
+
+ public void setMaxWaitMs(int maxWaitMs)
+ {
+ this.maxWaitMs = maxWaitMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
new file mode 100644
index 0000000..7f46de1
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Connection implements Closeable, ConnectionStateListener
+{
+ private final CuratorFramework client;
+ private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis());
+ private final Map<ThingKey, Object> things = Maps.newConcurrentMap();
+ private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
+
+ public Connection(CuratorFramework client)
+ {
+ this.client = client;
+ client.getConnectionStateListenable().addListener(this);
+ }
+
+ @Override
+ public void close()
+ {
+ client.close();
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ states.add(newState);
+ }
+
+ public ConnectionState blockingPopStateChange() throws InterruptedException
+ {
+ return states.take();
+ }
+
+ public void updateUse()
+ {
+ for ( Map.Entry<ThingKey, Object> entry : things.entrySet() )
+ {
+ //noinspection unchecked
+ entry.getKey().getType().closeFor(entry.getValue());
+ }
+ lastUseMs.set(System.currentTimeMillis());
+ }
+
+ public CuratorFramework getClient()
+ {
+ return client;
+ }
+
+ public long getLastUseMs()
+ {
+ return lastUseMs.get();
+ }
+
+ public <T> void putThing(ThingKey<T> key, T thing)
+ {
+ things.put(key, thing);
+ }
+
+ public <T> T getThing(ThingKey<T> key)
+ {
+ Object o = things.get(key);
+ if ( o != null )
+ {
+ return key.getType().getThingClass().cast(o);
+ }
+ return null;
+ }
+
+ public <T> T removeThing(ThingKey<T> key)
+ {
+ Object o = things.remove(key);
+ if ( o != null )
+ {
+ return key.getType().getThingClass().cast(o);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
new file mode 100644
index 0000000..febea7a
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+// TODO connection cleanup/timeouts
+
+public class ConnectionsManager implements Closeable
+{
+ private final Map<String, Connection> connections = Maps.newConcurrentMap();
+ private final CuratorFrameworkAllocator allocator;
+ private final ExecutorService executorService;
+
+ public ConnectionsManager(CuratorFrameworkAllocator allocator)
+ {
+ this(allocator, Executors.newCachedThreadPool(ThreadUtils.newThreadFactory("ConnectionsManager")));
+ }
+
+ public ConnectionsManager(CuratorFrameworkAllocator allocator, ExecutorService executorService)
+ {
+ this.allocator = allocator;
+ this.executorService = executorService;
+ }
+
+ public String newConnection() throws Exception
+ {
+ String id = UUID.randomUUID().toString();
+ CuratorFramework client = allocator.newCuratorFramework();
+ connections.put(id, new Connection(client));
+ return id;
+ }
+
+ public Connection get(String id)
+ {
+ Connection connection = connections.get(id);
+ if ( connection != null )
+ {
+ connection.updateUse();
+ }
+ return connection;
+ }
+
+ public boolean closeConnection(String id)
+ {
+ Connection connection = connections.remove(id);
+ if ( connection != null )
+ {
+ connection.close();
+ return true;
+ }
+ return false;
+ }
+
+ public ExecutorService getExecutorService()
+ {
+ return executorService;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ for ( Connection connection : connections.values() )
+ {
+ Closeables.closeQuietly(connection);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
new file mode 100644
index 0000000..d6dd768
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public interface CuratorFrameworkAllocator
+{
+ public CuratorFramework newCuratorFramework() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
new file mode 100644
index 0000000..3e60cbd
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+
+public class ThingKey<T>
+{
+ private final String id;
+ private final ThingType<T> type;
+
+ public ThingKey(ThingType<T> type)
+ {
+ this(UUID.randomUUID().toString(), type);
+ }
+
+ public ThingKey(String id, ThingType<T> type)
+ {
+ this.id = Preconditions.checkNotNull(id, "id cannot be null");
+ this.type = Preconditions.checkNotNull(type, "type cannot be null");
+ }
+
+ public String getId()
+ {
+ return id;
+ }
+
+ public ThingType<T> getType()
+ {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ ThingKey thingKey = (ThingKey)o;
+
+ if ( !id.equals(thingKey.id) )
+ {
+ return false;
+ }
+ //noinspection RedundantIfStatement
+ if ( type != thingKey.type )
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = id.hashCode();
+ result = 31 * result + type.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
new file mode 100644
index 0000000..d09f0cc
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import java.util.concurrent.Future;
+
+public interface ThingType<T>
+{
+ public static ThingType<InterProcessSemaphoreMutex> MUTEX = new ThingType<InterProcessSemaphoreMutex>()
+ {
+ @Override
+ public Class<InterProcessSemaphoreMutex> getThingClass()
+ {
+ return InterProcessSemaphoreMutex.class;
+ }
+
+ @Override
+ public void closeFor(InterProcessSemaphoreMutex instance)
+ {
+ // nop
+ }
+ };
+
+ public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>()
+ {
+ @Override
+ public Class<LeaderLatch> getThingClass()
+ {
+ return LeaderLatch.class;
+ }
+
+ @Override
+ public void closeFor(LeaderLatch latch)
+ {
+ Closeables.closeQuietly(latch);
+ }
+ };
+
+ public static ThingType<Future> FUTURE = new ThingType<Future>()
+ {
+ @Override
+ public Class<Future> getThingClass()
+ {
+ return Future.class;
+ }
+
+ @Override
+ public void closeFor(Future future)
+ {
+ future.cancel(true);
+ }
+ };
+
+ public Class<T> getThingClass();
+
+ public void closeFor(T instance);
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af548c2..81a2577 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
<module>curator-examples</module>
<module>curator-x-discovery</module>
<module>curator-x-discovery-server</module>
+ <module>curator-x-rest</module>
</modules>
<dependencyManagement>
[08/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/710d78d4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/710d78d4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/710d78d4
Branch: refs/heads/CURATOR-88
Commit: 710d78d48ad8ef9b7f954b2c407aaf9957d0f037
Parents: 656ecde
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 12 12:39:02 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 12 12:39:02 2014 -0500
----------------------------------------------------------------------
.../curator/x/rest/api/ClientResource.java | 8 +--
.../x/rest/api/PathChildrenCacheResource.java | 5 --
.../org/apache/curator/x/rest/api/Session.java | 1 -
.../x/rest/dropwizard/CuratorRestBundle.java | 1 -
.../curator/x/rest/entities/DataAndStat.java | 62 ++++++++++++++++++++
.../curator/x/rest/entities/NodeCacheSpec.java | 1 -
.../x/rest/entities/OptionalNodeData.java | 1 -
.../src/site/confluence/entities.confluence | 8 +--
8 files changed, 68 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
index 639d865..6d1ad82 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.*;
import org.apache.curator.x.rest.CuratorRestContext;
import org.apache.curator.x.rest.entities.CreateSpec;
+import org.apache.curator.x.rest.entities.DataAndStat;
import org.apache.curator.x.rest.entities.DeleteSpec;
import org.apache.curator.x.rest.entities.ExistsSpec;
import org.apache.curator.x.rest.entities.GetChildrenSpec;
@@ -33,7 +34,6 @@ import org.apache.curator.x.rest.entities.SetDataSpec;
import org.apache.curator.x.rest.entities.Status;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
@@ -282,11 +282,7 @@ public class ClientResource
{
result = new String((byte[])bytes);
}
-
- ObjectNode node = context.getMapper().createObjectNode();
- node.put("data", result);
- node.putPOJO("stat", stat);
- return Response.ok(context.getWriter().writeValueAsString(node)).build();
+ return Response.ok(new DataAndStat(result, stat)).build();
}
@POST
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
index 771eb93..f383caf 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/PathChildrenCacheResource.java
@@ -128,11 +128,6 @@ public class PathChildrenCacheResource
{
PathChildrenCache cache = Constants.getThing(context.getSession(), cacheId, PathChildrenCache.class);
ChildData currentData = cache.getCurrentData("/" + path);
- if ( currentData == null )
- {
- return Response.status(Response.Status.NOT_FOUND).build();
- }
-
return Response.ok(Constants.toNodeData(currentData)).build();
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
index cbc2211..4640a44 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java
@@ -25,7 +25,6 @@ import org.apache.curator.x.rest.entities.StatusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
index 80f3e23..6d56d61 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/dropwizard/CuratorRestBundle.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.rest.dropwizard;
-import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.spi.inject.SingletonTypeInjectableProvider;
import io.dropwizard.ConfiguredBundle;
import io.dropwizard.setup.Bootstrap;
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
new file mode 100644
index 0000000..63d2224
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/DataAndStat.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entities;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class DataAndStat
+{
+ private String data;
+ private Stat stat;
+
+ public DataAndStat()
+ {
+ this("", new Stat());
+ }
+
+ public DataAndStat(String data, Stat stat)
+ {
+ this.data = data;
+ this.stat = stat;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+ public void setData(String data)
+ {
+ this.data = data;
+ }
+
+ public Stat getStat()
+ {
+ return stat;
+ }
+
+ public void setStat(Stat stat)
+ {
+ this.stat = stat;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
index 07c197f..61f4bb6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/NodeCacheSpec.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.rest.entities;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
index 398308c..b3eb335 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entities/OptionalNodeData.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.rest.entities;
-import org.apache.zookeeper.data.Stat;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
http://git-wip-us.apache.org/repos/asf/curator/blob/710d78d4/curator-x-rest/src/site/confluence/entities.confluence
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/site/confluence/entities.confluence b/curator-x-rest/src/site/confluence/entities.confluence
index b6624c7..31be87f 100644
--- a/curator-x-rest/src/site/confluence/entities.confluence
+++ b/curator-x-rest/src/site/confluence/entities.confluence
@@ -5,17 +5,17 @@ h1. Entity Descriptions
Here are the entity descriptions for the entities used in the APIs:
||Field||Type||Description||
-|*Status*| | |
+| *Status*| | |
|state|string|This instance's Curator connection state. One of: "connected", "suspended", or "lost". If the state is other than "connected" you must assume that any open locks and/or watchers are no longer valid.|
|messages|array of StatusMessage|Any pending messages from this instance.|
| | | |
-|*StatusMessage*| | |
+| *StatusMessage*| | |
|type|string|The status message type. See the [[Managing Status|client.html]] section for details.|
|message|string|Type-dependent message|
|details|string|Type-dependent details|
|sourceId|string|Type-dependent sourceId|
| | | |
-|*GetChildrenSpec*| | |
+| *GetChildrenSpec*| | |
|path|string|The ZK path|
|async|boolean|If true, perform asynchronously|
|asyncId|string|for async, a user-defined ID to return in the status message|
@@ -23,7 +23,7 @@ Here are the entity descriptions for the entities used in the APIs:
|watched|boolean|if true, set a watch|
|watchId|string|if watched, a user-defined ID to return in the status when the watch triggers|
| | | |
-|*CreateSpec*| | |
+| *CreateSpec*| | |
|path|string|The ZK path|
|data|string|The data to store in the node|
|mode|string|The create mode. One of: "persistent", "persistent\_sequential", "ephemeral", or "ephemeral\_sequential"|
[09/10] git commit: some tests to help with CURATOR-97
Posted by ra...@apache.org.
some tests to help with CURATOR-97
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e4cb66ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e4cb66ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e4cb66ac
Branch: refs/heads/CURATOR-97-OLDER-MUST-HAVE-BEEN-A-MISTAKE
Commit: e4cb66acb9112a4291e593b2825744ef4a258d40
Parents: b34aaaa
Author: randgalt <ra...@apache.org>
Authored: Wed Mar 19 13:20:42 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Mar 19 13:20:42 2014 -0500
----------------------------------------------------------------------
.../recipes/nodes/PersistentEphemeralNode.java | 4 +-
.../nodes/TestPersistentEphemeralNode.java | 54 ++++++++++++++++++--
2 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/e4cb66ac/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 71095ba..1a4d38d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.nodes;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
@@ -50,7 +51,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class PersistentEphemeralNode implements Closeable
{
- private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ @VisibleForTesting
+ final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
http://git-wip-us.apache.org/repos/asf/curator/blob/e4cb66ac/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index e312081..5178fe5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,7 +20,6 @@ package org.apache.curator.framework.recipes.nodes;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.BaseClassForTests;
@@ -30,7 +29,7 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
-import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -72,10 +71,57 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
}
@Test
- public void testListenersReconnectedIsFast() throws Exception
+ public void testRetriesExpireAndReconnect() throws Exception
{
- System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
+ Timing timing = new Timing();
+ PersistentEphemeralNode node = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+ node.start();
+ Assert.assertTrue(node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+
+ server.stop();
+
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
+ CountDownLatch recreateLatch = new CountDownLatch(1);
+ node.initialCreateLatch.set(recreateLatch);
+
+ TimeUnit.MILLISECONDS.sleep(2 * timing.session()); // make sure session expires
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+ timing.sleepABit();
+ KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
+ timing.sleepABit();
+ Assert.assertTrue(timing.awaitLatch(recreateLatch));
+ Assert.assertNotNull(client.checkExists().forPath("/abc/node"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(node);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+ @Test
+ public void testListenersReconnectedIsFast() throws Exception
+ {
server.close();
Timing timing = new Timing();
[10/10] git commit: For added safety,
install a connection state listener and attempt to recreate the node
on RECONNECT
Posted by ra...@apache.org.
For added safety, install a connection state listener and attempt to recreate the node on RECONNECT
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bfdef2dd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bfdef2dd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bfdef2dd
Branch: refs/heads/CURATOR-97
Commit: bfdef2ddc1c3f2121256045690b39baafa269151
Parents: b17e6ff
Author: randgalt <ra...@apache.org>
Authored: Sat Mar 22 08:34:41 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Mar 22 08:34:41 2014 -0500
----------------------------------------------------------------------
.../recipes/nodes/PersistentEphemeralNode.java | 16 ++++++++++++++++
.../framework/client/TestBackgroundStates.java | 2 --
.../recipes/nodes/TestPersistentEphemeralNode.java | 2 --
3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 71095ba..6b6ae52 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -25,6 +25,8 @@ import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -79,6 +81,17 @@ public class PersistentEphemeralNode implements Closeable
}
}
};
+ private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.RECONNECTED )
+ {
+ createNode();
+ }
+ }
+ };
private enum State
{
@@ -226,6 +239,7 @@ public class PersistentEphemeralNode implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
+ client.getConnectionStateListenable().addListener(connectionStateListener);
createNode();
}
@@ -254,6 +268,8 @@ public class PersistentEphemeralNode implements Closeable
return;
}
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
+
try
{
deleteNode();
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
index b1c382f..cb186fc 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/client/TestBackgroundStates.java
@@ -97,8 +97,6 @@ public class TestBackgroundStates extends BaseClassForTests
@Test
public void testConnectionStateListener() throws Exception
{
- System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
-
server.close();
Timing timing = new Timing();
http://git-wip-us.apache.org/repos/asf/curator/blob/bfdef2dd/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index e312081..4ca487a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -74,8 +74,6 @@ public class TestPersistentEphemeralNode extends BaseClassForTests
@Test
public void testListenersReconnectedIsFast() throws Exception
{
- System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "true");
-
server.close();
Timing timing = new Timing();
[03/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/dd1fe969
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/dd1fe969
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/dd1fe969
Branch: refs/heads/rest
Commit: dd1fe969f725fa84317789e08e3228131105c0f6
Parents: d30a266
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 16:40:25 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 16:40:25 2014 -0500
----------------------------------------------------------------------
.../recipes/cache/PathChildrenCache.java | 11 ++
.../curator/x/rest/ConnectionResource.java | 2 +-
.../curator/x/rest/PathCacheRecipeResource.java | 118 ++++++++++---
.../x/rest/entity/PathChildrenCacheEntity.java | 84 ++++++++++
.../entity/PathChildrenCacheEventEntity.java | 84 ++++++++++
.../curator/x/rest/entity/StatEntity.java | 167 +++++++++++++++++++
.../x/rest/system/PathChildrenCacheThing.java | 59 +++++++
.../apache/curator/x/rest/system/ThingType.java | 11 +-
8 files changed, 503 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index d66f7f3..a1844db 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -170,6 +170,17 @@ public class PathChildrenCache implements Closeable
}
/**
+ * @param client the client
+ * @param path path to watch
+ * @param dataIsCompressed if true, data in the path is compressed
+ * @param cacheData if true, node contents are cached in addition to the stat
+ */
+ public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed)
+ {
+ this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+ }
+
+ /**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 6d3b559..4f70111 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -48,7 +48,7 @@ public class ConnectionResource
connectionsManager = contextResolver.getContext(ConnectionsManager.class);
}
- @POST
+ @GET
@Path("{id}/block-on-state-change")
public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
{
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index 1fc7b3d..d953bd6 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -19,14 +19,22 @@
package org.apache.curator.x.rest;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
+import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
+import org.apache.curator.x.rest.entity.StatEntity;
import org.apache.curator.x.rest.system.Connection;
import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.PathChildrenCacheThing;
import org.apache.curator.x.rest.system.ThingKey;
import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -34,10 +42,13 @@ import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
+import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
@Path("zookeeper/recipes/path-cache/{connectionId}")
public class PathCacheRecipeResource
@@ -50,9 +61,8 @@ public class PathCacheRecipeResource
}
@POST
- @Path("{path:.*}")
@Produces(MediaType.APPLICATION_JSON)
- public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+ public Response allocate(@PathParam("connectionId") String connectionId, PathChildrenCacheEntity spec) throws Exception
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -60,16 +70,22 @@ public class PathCacheRecipeResource
return Response.status(Response.Status.NOT_FOUND).build();
}
- InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
- ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
- connection.putThing(key, mutex);
+ PathChildrenCache cache = new PathChildrenCache(connection.getClient(), spec.getPath(), spec.isCacheData(), spec.isDataIsCompressed());
+ PathChildrenCacheThing cacheThing = new PathChildrenCacheThing(cache);
+ cache.getListenable().addListener(new LocalListener(cacheThing));
+
+ ThingKey<PathChildrenCacheThing> key = new ThingKey<PathChildrenCacheThing>(ThingType.PATH_CACHE);
+ connection.putThing(key, cacheThing);
+
+ PathChildrenCache.StartMode startMode = spec.isBuildInitial() ? PathChildrenCache.StartMode.POST_INITIALIZED_EVENT : PathChildrenCache.StartMode.NORMAL;
+ cache.start(startMode);
return Response.ok(key.getId()).build();
}
@DELETE
@Path("{id}")
- public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+ public Response delete(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId) throws IOException
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -77,23 +93,20 @@ public class PathCacheRecipeResource
return Response.status(Response.Status.NOT_FOUND).build();
}
- InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
- if ( mutex == null )
+ PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
{
return Response.status(Response.Status.NOT_FOUND).build();
}
- if ( mutex.isAcquiredInThisProcess() )
- {
- mutex.release();
- }
+ cacheThing.getCache().close();
return Response.ok().build();
}
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+ @GET
+ @Path("{id}/block-on-events")
+ public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
{
Connection connection = connectionsManager.get(connectionId);
if ( connection == null )
@@ -102,14 +115,14 @@ public class PathCacheRecipeResource
return;
}
- final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
- if ( mutex == null )
+ final PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
{
asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
return;
}
- connectionsManager.getExecutorService().submit
+ Future<?> future = connectionsManager.getExecutorService().submit
(
new Runnable()
{
@@ -118,15 +131,68 @@ public class PathCacheRecipeResource
{
try
{
- boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
- asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+ List<PathChildrenCacheEvent> events = cacheThing.blockForPendingEvents();
+ List<PathChildrenCacheEventEntity> transformed = Lists.transform(events, toEntity);
+ GenericEntity<List<PathChildrenCacheEventEntity>> entity = new GenericEntity<List<PathChildrenCacheEventEntity>>(transformed){};
+ asyncResponse.resume(Response.ok(entity).build());
}
- catch ( Exception e )
+ catch ( InterruptedException e )
{
- asyncResponse.resume(e);
+ Thread.currentThread().interrupt();
+ asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
}
}
}
);
+ connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
+ }
+
+ private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity> toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
+ {
+ @Override
+ public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
+ {
+ String path = (event.getData() != null) ? event.getData().getPath() : null;
+ String data = ((event.getData() != null) && (event.getData().getData() != null)) ? new String((event.getData().getData())) : null;
+ StatEntity stat = ((event.getData() != null) && (event.getData().getStat() != null))
+ ? new StatEntity
+ (
+ event.getData().getStat().getCzxid(),
+ event.getData().getStat().getMzxid(),
+ event.getData().getStat().getCtime(),
+ event.getData().getStat().getMtime(),
+ event.getData().getStat().getVersion(),
+ event.getData().getStat().getCversion(),
+ event.getData().getStat().getAversion(),
+ event.getData().getStat().getEphemeralOwner(),
+ event.getData().getStat().getDataLength(),
+ event.getData().getStat().getNumChildren(),
+ event.getData().getStat().getPzxid()
+ )
+ : null;
+ return new PathChildrenCacheEventEntity
+ (
+ event.getType().name(),
+ path,
+ data,
+ stat
+ );
+ }
+ };
+
+ private static class LocalListener implements PathChildrenCacheListener
+ {
+ private final PathChildrenCacheThing cacheThing;
+
+ public LocalListener(PathChildrenCacheThing cacheThing)
+ {
+ this.cacheThing = cacheThing;
+ }
+
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ cacheThing.addEvent(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
new file mode 100644
index 0000000..c2c3328
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEntity
+{
+ private String path;
+ private boolean dataIsCompressed;
+ private boolean cacheData;
+ private boolean buildInitial;
+
+ public PathChildrenCacheEntity()
+ {
+ this("", false, false, false);
+ }
+
+ public PathChildrenCacheEntity(String path, boolean dataIsCompressed, boolean cacheData, boolean buildInitial)
+ {
+ this.path = path;
+ this.dataIsCompressed = dataIsCompressed;
+ this.cacheData = cacheData;
+ this.buildInitial = buildInitial;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public boolean isDataIsCompressed()
+ {
+ return dataIsCompressed;
+ }
+
+ public void setDataIsCompressed(boolean dataIsCompressed)
+ {
+ this.dataIsCompressed = dataIsCompressed;
+ }
+
+ public boolean isCacheData()
+ {
+ return cacheData;
+ }
+
+ public void setCacheData(boolean cacheData)
+ {
+ this.cacheData = cacheData;
+ }
+
+ public boolean isBuildInitial()
+ {
+ return buildInitial;
+ }
+
+ public void setBuildInitial(boolean buildInitial)
+ {
+ this.buildInitial = buildInitial;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
new file mode 100644
index 0000000..9b08d32
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheEventEntity
+{
+ private String eventType;
+ private String path;
+ private String data;
+ private StatEntity stat;
+
+ public PathChildrenCacheEventEntity()
+ {
+ this("", "", "", new StatEntity());
+ }
+
+ public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity stat)
+ {
+ this.eventType = eventType;
+ this.path = path;
+ this.data = data;
+ this.stat = stat;
+ }
+
+ public String getEventType()
+ {
+ return eventType;
+ }
+
+ public void setEventType(String eventType)
+ {
+ this.eventType = eventType;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+ public void setData(String data)
+ {
+ this.data = data;
+ }
+
+ public StatEntity getStat()
+ {
+ return stat;
+ }
+
+ public void setStat(StatEntity stat)
+ {
+ this.stat = stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
new file mode 100644
index 0000000..ef323ec
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/StatEntity.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class StatEntity
+{
+ private long czxid;
+ private long mzxid;
+ private long ctime;
+ private long mtime;
+ private int version;
+ private int cversion;
+ private int aversion;
+ private long ephemeralOwner;
+ private int dataLength;
+ private int numChildren;
+ private long pzxid;
+
+ public StatEntity()
+ {
+ }
+
+ public StatEntity(long czxid, long mzxid, long ctime, long mtime, int version, int cversion, int aversion, long ephemeralOwner, int dataLength, int numChildren, long pzxid)
+ {
+ this.czxid = czxid;
+ this.mzxid = mzxid;
+ this.ctime = ctime;
+ this.mtime = mtime;
+ this.version = version;
+ this.cversion = cversion;
+ this.aversion = aversion;
+ this.ephemeralOwner = ephemeralOwner;
+ this.dataLength = dataLength;
+ this.numChildren = numChildren;
+ this.pzxid = pzxid;
+ }
+
+ public long getCzxid()
+ {
+ return czxid;
+ }
+
+ public void setCzxid(long czxid)
+ {
+ this.czxid = czxid;
+ }
+
+ public long getMzxid()
+ {
+ return mzxid;
+ }
+
+ public void setMzxid(long mzxid)
+ {
+ this.mzxid = mzxid;
+ }
+
+ public long getCtime()
+ {
+ return ctime;
+ }
+
+ public void setCtime(long ctime)
+ {
+ this.ctime = ctime;
+ }
+
+ public long getMtime()
+ {
+ return mtime;
+ }
+
+ public void setMtime(long mtime)
+ {
+ this.mtime = mtime;
+ }
+
+ public int getVersion()
+ {
+ return version;
+ }
+
+ public void setVersion(int version)
+ {
+ this.version = version;
+ }
+
+ public int getCversion()
+ {
+ return cversion;
+ }
+
+ public void setCversion(int cversion)
+ {
+ this.cversion = cversion;
+ }
+
+ public int getAversion()
+ {
+ return aversion;
+ }
+
+ public void setAversion(int aversion)
+ {
+ this.aversion = aversion;
+ }
+
+ public long getEphemeralOwner()
+ {
+ return ephemeralOwner;
+ }
+
+ public void setEphemeralOwner(long ephemeralOwner)
+ {
+ this.ephemeralOwner = ephemeralOwner;
+ }
+
+ public int getDataLength()
+ {
+ return dataLength;
+ }
+
+ public void setDataLength(int dataLength)
+ {
+ this.dataLength = dataLength;
+ }
+
+ public int getNumChildren()
+ {
+ return numChildren;
+ }
+
+ public void setNumChildren(int numChildren)
+ {
+ this.numChildren = numChildren;
+ }
+
+ public long getPzxid()
+ {
+ return pzxid;
+ }
+
+ public void setPzxid(long pzxid)
+ {
+ this.pzxid = pzxid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
new file mode 100644
index 0000000..1c507aa
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/PathChildrenCacheThing.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.system;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import java.util.List;
+
+public class PathChildrenCacheThing
+{
+ private final PathChildrenCache cache;
+ private final List<PathChildrenCacheEvent> events = Lists.newLinkedList();
+
+ public PathChildrenCacheThing(PathChildrenCache cache)
+ {
+ this.cache = cache;
+ }
+
+ public PathChildrenCache getCache()
+ {
+ return cache;
+ }
+
+ public synchronized List<PathChildrenCacheEvent> blockForPendingEvents() throws InterruptedException
+ {
+ while ( events.size() == 0 )
+ {
+ wait();
+ }
+
+ List<PathChildrenCacheEvent> localEvents = Lists.newArrayList(events);
+ events.clear();
+ return localEvents;
+ }
+
+ public synchronized void addEvent(PathChildrenCacheEvent event)
+ {
+ events.add(event);
+ notifyAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/dd1fe969/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index 13448dd..81251fc 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,6 @@
package org.apache.curator.x.rest.system;
import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import java.util.concurrent.Future;
@@ -41,18 +40,18 @@ public interface ThingType<T>
}
};
- public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
+ public static ThingType<PathChildrenCacheThing> PATH_CACHE = new ThingType<PathChildrenCacheThing>()
{
@Override
- public Class<PathChildrenCache> getThingClass()
+ public Class<PathChildrenCacheThing> getThingClass()
{
- return PathChildrenCache.class;
+ return PathChildrenCacheThing.class;
}
@Override
- public void closeFor(PathChildrenCache cache)
+ public void closeFor(PathChildrenCacheThing cache)
{
- Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(cache.getCache());
}
};
[07/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a08f3c55
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a08f3c55
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a08f3c55
Branch: refs/heads/websockets
Commit: a08f3c55db4c91e4baf809c19f5c9da23e648360
Parents: f68a785
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 13:18:36 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 13:18:36 2014 -0500
----------------------------------------------------------------------
.../curator/x/websockets/api/zookeeper/Create.java | 17 +++++++++++++++--
.../x/websockets/details/CuratorEndpoint.java | 1 +
2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a08f3c55/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 24cb076..eba903e 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,13 @@
package org.apache.curator.x.websockets.api.zookeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.Compressible;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.x.websockets.api.ApiCommand;
import org.apache.curator.x.websockets.api.JsonUtils;
@@ -33,6 +37,7 @@ import org.codehaus.jackson.map.ObjectWriter;
public class Create implements ApiCommand
{
+ @SuppressWarnings("unchecked")
@Override
public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
{
@@ -67,14 +72,22 @@ public class Create implements ApiCommand
builder = ((CreateModable)builder).withMode(createMode);
}
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ System.out.println();
+ }
+ };
if ( payload != null )
{
String payloadStr = writer.writeValueAsString(payload);
- result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+ result = ((BackgroundPathAndBytesable<String>)builder).inBackground(callback).forPath(path, payloadStr.getBytes());
}
else
{
- result = ((PathAndBytesable)builder).forPath(path);
+ result = ((BackgroundPathAndBytesable<String>)builder).inBackground(callback).forPath(path);
}
}
catch ( ClassCastException e )
http://git-wip-us.apache.org/repos/asf/curator/blob/a08f3c55/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 6f664de..adf6dfb 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -81,6 +81,7 @@ public class CuratorEndpoint extends Endpoint
catch ( Exception e )
{
// TODO
+ e.printStackTrace();
}
MessageHandler handler = new MessageHandler.Whole<String>()
[05/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/0910d482
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/0910d482
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/0910d482
Branch: refs/heads/websockets
Commit: 0910d4821f09bf1d965c8547ef82b1516976a6b8
Parents: 20d8c06
Author: randgalt <ra...@apache.org>
Authored: Fri Jan 10 18:44:13 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jan 10 18:44:13 2014 -0500
----------------------------------------------------------------------
curator-x-websockets/pom.xml | 71 ++++++++++++
.../curator/x/websockets/ClientCreator.java | 27 +++++
.../x/websockets/CuratorWebsocketsConfig.java | 68 ++++++++++++
.../x/websockets/CuratorWebsocketsServer.java | 77 +++++++++++++
.../x/websockets/DefaultClientCreator.java | 45 ++++++++
.../curator/x/websockets/api/ApiCommand.java | 30 ++++++
.../x/websockets/api/CommandManager.java | 49 +++++++++
.../curator/x/websockets/api/JsonUtils.java | 56 ++++++++++
.../x/websockets/api/zookeeper/Create.java | 46 ++++++++
.../x/websockets/details/CuratorEndpoint.java | 107 +++++++++++++++++++
.../details/CuratorWebsocketsSession.java | 53 +++++++++
.../x/websockets/details/SessionManager.java | 64 +++++++++++
.../apache/curator/x/websockets/TestServer.java | 30 ++++++
pom.xml | 1 +
14 files changed, 724 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/pom.xml
----------------------------------------------------------------------
diff --git a/curator-x-websockets/pom.xml b/curator-x-websockets/pom.xml
new file mode 100644
index 0000000..95803c1
--- /dev/null
+++ b/curator-x-websockets/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?><!--~
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>apache-curator</artifactId>
+ <groupId>org.apache.curator</groupId>
+ <version>2.3.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>curator-x-websockets</artifactId>
+ <version>2.3.2-SNAPSHOT</version>
+
+ <properties>
+ <tyrus.version>1.3.3</tyrus.version>
+
+ <osgi.import.package>
+ *
+ </osgi.import.package>
+ <osgi.export.package>
+ org.apache.curator.x.websockets*;version="${project.version}";-noimport:=true
+ </osgi.export.package>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.tyrus</groupId>
+ <artifactId>tyrus-server</artifactId>
+ <version>${tyrus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.glassfish.tyrus</groupId>
+ <artifactId>tyrus-container-grizzly-server</artifactId>
+ <version>${tyrus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.2</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
new file mode 100644
index 0000000..65b8fe0
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/ClientCreator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import org.apache.curator.framework.CuratorFramework;
+
+public interface ClientCreator
+{
+ public CuratorFramework newClient() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
new file mode 100644
index 0000000..c7227c4
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsConfig.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+public class CuratorWebsocketsConfig
+{
+ private String bindHost = "localhost";
+ private int port = 8080;
+ private String rootPath = "/websockets";
+ private String websocketPath = "/curator";
+
+ public String getBindHost()
+ {
+ return bindHost;
+ }
+
+ public void setBindHost(String bindHost)
+ {
+ this.bindHost = bindHost;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public String getRootPath()
+ {
+ return rootPath;
+ }
+
+ public void setRootPath(String rootPath)
+ {
+ this.rootPath = rootPath;
+ }
+
+ public String getWebsocketPath()
+ {
+ return websocketPath;
+ }
+
+ public void setWebsocketPath(String websocketPath)
+ {
+ this.websocketPath = websocketPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
new file mode 100644
index 0000000..3bc8ff2
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/CuratorWebsocketsServer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.x.websockets.api.CommandManager;
+import org.apache.curator.x.websockets.details.CuratorEndpoint;
+import org.apache.curator.x.websockets.details.SessionManager;
+import org.glassfish.tyrus.spi.ServerContainer;
+import org.glassfish.tyrus.spi.ServerContainerFactory;
+import javax.websocket.server.ServerEndpointConfig;
+import java.io.Closeable;
+import java.util.List;
+
+public class CuratorWebsocketsServer implements Closeable
+{
+ private final ServerContainer serverContainer;
+ private final String rootPath;
+ private final int port;
+ private final CommandManager commandManager = new CommandManager();
+
+ public CuratorWebsocketsServer(CuratorWebsocketsConfig config, ClientCreator clientCreator) throws Exception
+ {
+ rootPath = config.getRootPath();
+ port = config.getPort();
+
+ serverContainer = ServerContainerFactory.createServerContainer(null);
+
+ final SessionManager sessionManager = new SessionManager(clientCreator, commandManager);
+ ServerEndpointConfig.Configurator configurator = new ServerEndpointConfig.Configurator()
+ {
+ @Override
+ public String getNegotiatedSubprotocol(List<String> supported, List<String> requested)
+ {
+ return super.getNegotiatedSubprotocol(supported, requested);
+ }
+
+ @Override
+ public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException
+ {
+ Preconditions.checkArgument(endpointClass.equals(CuratorEndpoint.class), "Expected CuratorEndpoint: " + endpointClass.getName());
+ //noinspection unchecked
+ return (T)new CuratorEndpoint(sessionManager);
+ }
+ };
+ ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(CuratorEndpoint.class, config.getWebsocketPath()).configurator(configurator).build();
+ serverContainer.addEndpoint(serverEndpointConfig);
+ }
+
+ public void start() throws Exception
+ {
+ serverContainer.start(rootPath, port);
+ }
+
+ @Override
+ public void close()
+ {
+ serverContainer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
new file mode 100644
index 0000000..619445c
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/DefaultClientCreator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+public class DefaultClientCreator implements ClientCreator
+{
+ private final ExponentialBackoffRetry retryPolicy;
+
+ public DefaultClientCreator()
+ {
+ this(new ExponentialBackoffRetry(100, 3));
+ }
+
+ public DefaultClientCreator(ExponentialBackoffRetry retryPolicy)
+ {
+ this.retryPolicy = retryPolicy;
+ }
+
+ @Override
+ public CuratorFramework newClient() throws Exception
+ {
+ return CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
new file mode 100644
index 0000000..438e1a6
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public interface ApiCommand
+{
+ public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
new file mode 100644
index 0000000..c16e927
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/CommandManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.x.websockets.api.zookeeper.Create;
+import java.io.FileNotFoundException;
+import java.util.Map;
+
+public class CommandManager
+{
+ private final Map<String, Class<? extends ApiCommand>> commands;
+
+ public CommandManager()
+ {
+ ImmutableMap.Builder<String, Class<? extends ApiCommand>> builder = ImmutableMap.builder();
+
+ builder.put("zookeeper/create", Create.class);
+
+ commands = builder.build();
+ }
+
+ public ApiCommand newCommand(String name) throws Exception
+ {
+ Class<? extends ApiCommand> clazz = commands.get(name);
+ if ( clazz == null )
+ {
+ throw new FileNotFoundException(name);
+ }
+ return clazz.newInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
new file mode 100644
index 0000000..8c18d7c
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api;
+
+import org.codehaus.jackson.JsonNode;
+
+public class JsonUtils
+{
+ public static String getRequiredString(JsonNode node, String name) throws Exception
+ {
+ JsonNode jsonNode = node.get(name);
+ if ( jsonNode == null )
+ {
+ throw new Exception("Required field is missing: " + name);
+ }
+ return jsonNode.asText();
+ }
+
+ public static String getOptionalString(JsonNode node, String name)
+ {
+ return getOptionalString(node, name, null);
+ }
+
+ public static String getOptionalString(JsonNode node, String name, String defaultValue)
+ {
+ JsonNode jsonNode = node.get(name);
+ return (jsonNode != null) ? jsonNode.asText() : defaultValue;
+ }
+
+ public static boolean getOptionalBoolean(JsonNode node, String name)
+ {
+ JsonNode jsonNode = node.get(name);
+ return (jsonNode != null) && jsonNode.asBoolean();
+ }
+
+ private JsonUtils()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
new file mode 100644
index 0000000..3cd148d
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.api.zookeeper;
+
+import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
+import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+public class Create implements ApiCommand
+{
+ @Override
+ public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+ {
+ String path = JsonUtils.getRequiredString(input, "path");
+ boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
+ boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
+ boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
+ boolean async = JsonUtils.getOptionalBoolean(input, "async");
+ String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+
+ JsonNode payload = input.get("payload");
+
+ // TODO ACL
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
new file mode 100644
index 0000000..576f475
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.x.websockets.api.ApiCommand;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+import javax.websocket.CloseReason;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.Session;
+import java.io.IOException;
+
+public class CuratorEndpoint extends Endpoint
+{
+ private final SessionManager sessionManager;
+ private final ObjectReader reader = new ObjectMapper().reader();
+ private final ObjectWriter writer = new ObjectMapper().writer();
+
+ public CuratorEndpoint(SessionManager sessionManager)
+ {
+ this.sessionManager = sessionManager;
+ }
+
+ @Override
+ public void onOpen(final Session session, EndpointConfig config)
+ {
+ try
+ {
+ CuratorFramework client = sessionManager.getClientCreator().newClient();
+ sessionManager.put(session, new CuratorWebsocketsSession(client, session));
+
+ client.start();
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+
+ MessageHandler handler = new MessageHandler.Whole<String>()
+ {
+ @Override
+ public void onMessage(String message)
+ {
+ processMessage(session, message);
+ }
+ };
+ session.addMessageHandler(handler);
+ }
+
+ @Override
+ public void onClose(Session session, CloseReason closeReason)
+ {
+ CuratorWebsocketsSession curatorWebsocketsSession = sessionManager.remove(session);
+ if ( curatorWebsocketsSession != null )
+ {
+ curatorWebsocketsSession.close();
+ }
+ }
+
+ private void processMessage(Session session, String message)
+ {
+ try
+ {
+ CuratorWebsocketsSession curatorWebsocketsSession = sessionManager.get(session);
+ if ( curatorWebsocketsSession == null )
+ {
+ throw new Exception("No session found for sessionId: " + session.getId());
+ }
+
+ JsonNode jsonNode = reader.readTree(message);
+ JsonNode command = jsonNode.get("command");
+ if ( command == null )
+ {
+ throw new Exception("Missing field: \"command\"");
+ }
+ String commandName = command.asText();
+ ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
+ apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
new file mode 100644
index 0000000..9df8b68
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorWebsocketsSession.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import com.google.common.io.Closeables;
+import org.apache.curator.framework.CuratorFramework;
+import javax.websocket.Session;
+import java.io.Closeable;
+
+public class CuratorWebsocketsSession implements Closeable
+{
+ private final CuratorFramework client;
+ private final Session session;
+
+ public CuratorWebsocketsSession(CuratorFramework client, Session session)
+ {
+ this.client = client;
+ this.session = session;
+ }
+
+ @Override
+ public void close()
+ {
+ Closeables.closeQuietly(client);
+ }
+
+ public CuratorFramework getClient()
+ {
+ return client;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
new file mode 100644
index 0000000..5c37ac3
--- /dev/null
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/SessionManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets.details;
+
+import com.google.common.collect.Maps;
+import org.apache.curator.x.websockets.ClientCreator;
+import org.apache.curator.x.websockets.api.CommandManager;
+import javax.websocket.Session;
+import java.util.Map;
+
+public class SessionManager
+{
+ private final Map<String, CuratorWebsocketsSession> sessions = Maps.newConcurrentMap();
+ private final ClientCreator clientCreator;
+ private final CommandManager commandManager;
+
+ public SessionManager(ClientCreator clientCreator, CommandManager commandManager)
+ {
+ this.clientCreator = clientCreator;
+ this.commandManager = commandManager;
+ }
+
+ public void put(Session session, CuratorWebsocketsSession curatorWebsocketsSession)
+ {
+ sessions.put(session.getId(), curatorWebsocketsSession);
+ }
+
+ public CuratorWebsocketsSession get(Session session)
+ {
+ return sessions.get(session.getId());
+ }
+
+ public CuratorWebsocketsSession remove(Session session)
+ {
+ return sessions.remove(session.getId());
+ }
+
+ public ClientCreator getClientCreator()
+ {
+ return clientCreator;
+ }
+
+ public CommandManager getCommandManager()
+ {
+ return commandManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java b/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
new file mode 100644
index 0000000..f9eef23
--- /dev/null
+++ b/curator-x-websockets/src/test/java/org/apache/curator/x/websockets/TestServer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.websockets;
+
+public class TestServer
+{
+ public static void main(String[] args) throws Exception
+ {
+ CuratorWebsocketsServer server = new CuratorWebsocketsServer(new CuratorWebsocketsConfig(), new DefaultClientCreator());
+ server.start();
+ Thread.currentThread().join();
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/0910d482/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af548c2..91e899f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
<module>curator-examples</module>
<module>curator-x-discovery</module>
<module>curator-x-discovery-server</module>
+ <module>curator-x-websockets</module>
</modules>
<dependencyManagement>
[02/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d30a2664
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d30a2664
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d30a2664
Branch: refs/heads/rest
Commit: d30a2664596dbc4abc94929e3d98b3301a12b2e7
Parents: 91fb388
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 15:41:38 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 15:41:38 2014 -0500
----------------------------------------------------------------------
.../curator/x/rest/ConnectionResource.java | 13 +-
.../curator/x/rest/LeaderRecipeResource.java | 132 -------------------
.../curator/x/rest/PathCacheRecipeResource.java | 132 +++++++++++++++++++
.../x/rest/entity/ConnectionStateEntity.java | 60 +++++++++
.../curator/x/rest/system/Connection.java | 37 +++++-
.../apache/curator/x/rest/system/ThingType.java | 12 +-
6 files changed, 236 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 88fd202..6d3b559 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -19,7 +19,6 @@
package org.apache.curator.x.rest;
-import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.rest.system.Connection;
import org.apache.curator.x.rest.system.ConnectionsManager;
import org.apache.curator.x.rest.system.ThingKey;
@@ -30,6 +29,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
@@ -49,8 +49,8 @@ public class ConnectionResource
}
@POST
- @Path("{id}/connection-state-change")
- public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id)
+ @Path("{id}/block-on-state-change")
+ public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
{
final Connection connection = connectionsManager.get(id);
if ( connection == null )
@@ -59,6 +59,7 @@ public class ConnectionResource
return;
}
+ final long currentStateCount = (currentStateCountArg != null) ? Long.parseLong(currentStateCountArg) : -1;
Future<?> future = connectionsManager.getExecutorService().submit(new Runnable()
{
@Override
@@ -66,8 +67,8 @@ public class ConnectionResource
{
try
{
- ConnectionState state = connection.blockingPopStateChange();
- asyncResponse.resume(Response.ok(state.name()).build());
+ connection.blockUntilStateChange(currentStateCount);
+ asyncResponse.resume(Response.ok().build());
}
catch ( InterruptedException e )
{
@@ -90,7 +91,7 @@ public class ConnectionResource
return Response.status(Response.Status.NOT_FOUND).build();
}
- return Response.ok(connection.getClient().getState().name()).build();
+ return Response.ok(connection.getState()).build();
}
@POST
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
deleted file mode 100644
index 868fb33..0000000
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.curator.x.rest;
-
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.rest.entity.LockRequestEntity;
-import org.apache.curator.x.rest.system.Connection;
-import org.apache.curator.x.rest.system.ConnectionsManager;
-import org.apache.curator.x.rest.system.ThingKey;
-import org.apache.curator.x.rest.system.ThingType;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ContextResolver;
-import java.util.concurrent.TimeUnit;
-
-@Path("zookeeper/recipes/leader/{connectionId}")
-public class LeaderRecipeResource
-{
- private final ConnectionsManager connectionsManager;
-
- public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
- {
- connectionsManager = contextResolver.getContext(ConnectionsManager.class);
- }
-
- @POST
- @Path("{path:.*}")
- @Produces(MediaType.APPLICATION_JSON)
- public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
- {
- Connection connection = connectionsManager.get(connectionId);
- if ( connection == null )
- {
- return Response.status(Response.Status.NOT_FOUND).build();
- }
-
- InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
- ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
- connection.putThing(key, mutex);
-
- return Response.ok(key.getId()).build();
- }
-
- @DELETE
- @Path("{id}")
- public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
- {
- Connection connection = connectionsManager.get(connectionId);
- if ( connection == null )
- {
- return Response.status(Response.Status.NOT_FOUND).build();
- }
-
- InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
- if ( mutex == null )
- {
- return Response.status(Response.Status.NOT_FOUND).build();
- }
-
- if ( mutex.isAcquiredInThisProcess() )
- {
- mutex.release();
- }
-
- return Response.ok().build();
- }
-
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
- {
- Connection connection = connectionsManager.get(connectionId);
- if ( connection == null )
- {
- asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
- return;
- }
-
- final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
- if ( mutex == null )
- {
- asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
- return;
- }
-
- connectionsManager.getExecutorService().submit
- (
- new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
- asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
- }
- catch ( Exception e )
- {
- asyncResponse.resume(e);
- }
- }
- }
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
new file mode 100644
index 0000000..1fc7b3d
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest;
+
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.rest.entity.LockRequestEntity;
+import org.apache.curator.x.rest.system.Connection;
+import org.apache.curator.x.rest.system.ConnectionsManager;
+import org.apache.curator.x.rest.system.ThingKey;
+import org.apache.curator.x.rest.system.ThingType;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ContextResolver;
+import java.util.concurrent.TimeUnit;
+
+@Path("zookeeper/recipes/path-cache/{connectionId}")
+public class PathCacheRecipeResource
+{
+ private final ConnectionsManager connectionsManager;
+
+ public PathCacheRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver)
+ {
+ connectionsManager = contextResolver.getContext(ConnectionsManager.class);
+ }
+
+ @POST
+ @Path("{path:.*}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path);
+ ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX);
+ connection.putThing(key, mutex);
+
+ return Response.ok(key.getId()).build();
+ }
+
+ @DELETE
+ @Path("{id}")
+ public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ if ( mutex.isAcquiredInThisProcess() )
+ {
+ mutex.release();
+ }
+
+ return Response.ok().build();
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX));
+ if ( mutex == null )
+ {
+ asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build());
+ return;
+ }
+
+ connectionsManager.getExecutorService().submit
+ (
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS);
+ asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build());
+ }
+ catch ( Exception e )
+ {
+ asyncResponse.resume(e);
+ }
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
new file mode 100644
index 0000000..797c26e
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/ConnectionStateEntity.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class ConnectionStateEntity
+{
+ private String state;
+ private long stateCount;
+
+ public ConnectionStateEntity()
+ {
+ this("", -1);
+ }
+
+ public ConnectionStateEntity(String state, long stateCount)
+ {
+ this.state = state;
+ this.stateCount = stateCount;
+ }
+
+ public String getState()
+ {
+ return state;
+ }
+
+ public void setState(String state)
+ {
+ this.state = state;
+ }
+
+ public long getStateCount()
+ {
+ return stateCount;
+ }
+
+ public void setStateCount(long stateCount)
+ {
+ this.stateCount = stateCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
index 7f46de1..7ef36dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java
@@ -19,14 +19,14 @@
package org.apache.curator.x.rest.system;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.x.rest.entity.ConnectionStateEntity;
import java.io.Closeable;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public class Connection implements Closeable, ConnectionStateListener
@@ -34,7 +34,8 @@ public class Connection implements Closeable, ConnectionStateListener
private final CuratorFramework client;
private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis());
private final Map<ThingKey, Object> things = Maps.newConcurrentMap();
- private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
+ private final Object stateLock = new Object();
+ private long stateCount = 0; // guarded by stateLock
public Connection(CuratorFramework client)
{
@@ -51,12 +52,35 @@ public class Connection implements Closeable, ConnectionStateListener
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- states.add(newState);
+ synchronized(stateLock)
+ {
+ ++stateCount;
+ stateLock.notifyAll();
+ }
+ }
+
+ public ConnectionStateEntity getState()
+ {
+ synchronized(stateLock)
+ {
+ return new ConnectionStateEntity(client.getState().name(), stateCount);
+ }
}
- public ConnectionState blockingPopStateChange() throws InterruptedException
+ public void blockUntilStateChange(long expectedStateCount) throws InterruptedException
{
- return states.take();
+ synchronized(stateLock)
+ {
+ if ( expectedStateCount < 0 )
+ {
+ expectedStateCount = stateCount;
+ }
+
+ while ( stateCount == expectedStateCount )
+ {
+ stateLock.wait();
+ }
+ }
}
public void updateUse()
@@ -81,6 +105,7 @@ public class Connection implements Closeable, ConnectionStateListener
public <T> void putThing(ThingKey<T> key, T thing)
{
+ thing = Preconditions.checkNotNull(thing, "thing cannot be null");
things.put(key, thing);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d30a2664/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
index d09f0cc..13448dd 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java
@@ -20,7 +20,7 @@
package org.apache.curator.x.rest.system;
import com.google.common.io.Closeables;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import java.util.concurrent.Future;
@@ -41,18 +41,18 @@ public interface ThingType<T>
}
};
- public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>()
+ public static ThingType<PathChildrenCache> PATH_CACHE = new ThingType<PathChildrenCache>()
{
@Override
- public Class<LeaderLatch> getThingClass()
+ public Class<PathChildrenCache> getThingClass()
{
- return LeaderLatch.class;
+ return PathChildrenCache.class;
}
@Override
- public void closeFor(LeaderLatch latch)
+ public void closeFor(PathChildrenCache cache)
{
- Closeables.closeQuietly(latch);
+ Closeables.closeQuietly(cache);
}
};
[04/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2fa263e9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2fa263e9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2fa263e9
Branch: refs/heads/rest
Commit: 2fa263e94044a813d6fff47268626fd0bfa69df7
Parents: dd1fe96
Author: randgalt <ra...@apache.org>
Authored: Wed Jan 8 21:00:07 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Jan 8 21:00:07 2014 -0500
----------------------------------------------------------------------
.../curator/x/rest/ConnectionResource.java | 2 +-
.../curator/x/rest/PathCacheRecipeResource.java | 121 +++++++++++++++----
.../entity/PathChildrenCacheDataEntity.java | 72 +++++++++++
.../entity/PathChildrenCacheEventEntity.java | 34 +-----
4 files changed, 173 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
index 4f70111..cc05369 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java
@@ -49,7 +49,7 @@ public class ConnectionResource
}
@GET
- @Path("{id}/block-on-state-change")
+ @Path("block-on-state-change/{id}")
public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id, @QueryParam("state-count") String currentStateCountArg)
{
final Connection connection = connectionsManager.get(id);
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
index d953bd6..2c44b88 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/PathCacheRecipeResource.java
@@ -22,9 +22,11 @@ package org.apache.curator.x.rest;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.x.rest.entity.PathChildrenCacheDataEntity;
import org.apache.curator.x.rest.entity.PathChildrenCacheEntity;
import org.apache.curator.x.rest.entity.PathChildrenCacheEventEntity;
import org.apache.curator.x.rest.entity.StatEntity;
@@ -105,7 +107,8 @@ public class PathCacheRecipeResource
}
@GET
- @Path("{id}/block-on-events")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("block-on-events/{id}")
public void getEvents(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
{
Connection connection = connectionsManager.get(connectionId);
@@ -147,39 +150,105 @@ public class PathCacheRecipeResource
connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future);
}
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}")
+ public Response getCurrentData(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId)
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ List<ChildData> currentData = cacheThing.getCache().getCurrentData();
+ List<PathChildrenCacheDataEntity> transformed = Lists.transform(currentData, new Function<ChildData, PathChildrenCacheDataEntity>()
+ {
+ @Override
+ public PathChildrenCacheDataEntity apply(ChildData data)
+ {
+ return toEntity(data);
+ }
+ });
+
+ GenericEntity<List<PathChildrenCacheDataEntity>> entity = new GenericEntity<List<PathChildrenCacheDataEntity>>(transformed){};
+ return Response.ok(entity).build();
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/{path:.*}")
+ public Response getCurrentData(@PathParam("connectionId") String connectionId, @PathParam("id") String cacheId, @PathParam("path") String fullPath)
+ {
+ Connection connection = connectionsManager.get(connectionId);
+ if ( connection == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ PathChildrenCacheThing cacheThing = connection.removeThing(new ThingKey<PathChildrenCacheThing>(cacheId, ThingType.PATH_CACHE));
+ if ( cacheThing == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ ChildData currentData = cacheThing.getCache().getCurrentData(fullPath);
+ if (currentData == null )
+ {
+ return Response.status(Response.Status.NOT_FOUND).build();
+ }
+
+ return Response.ok(toEntity(currentData)).build();
+ }
+
private static final Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity> toEntity = new Function<PathChildrenCacheEvent, PathChildrenCacheEventEntity>()
{
@Override
public PathChildrenCacheEventEntity apply(PathChildrenCacheEvent event)
{
- String path = (event.getData() != null) ? event.getData().getPath() : null;
- String data = ((event.getData() != null) && (event.getData().getData() != null)) ? new String((event.getData().getData())) : null;
- StatEntity stat = ((event.getData() != null) && (event.getData().getStat() != null))
- ? new StatEntity
- (
- event.getData().getStat().getCzxid(),
- event.getData().getStat().getMzxid(),
- event.getData().getStat().getCtime(),
- event.getData().getStat().getMtime(),
- event.getData().getStat().getVersion(),
- event.getData().getStat().getCversion(),
- event.getData().getStat().getAversion(),
- event.getData().getStat().getEphemeralOwner(),
- event.getData().getStat().getDataLength(),
- event.getData().getStat().getNumChildren(),
- event.getData().getStat().getPzxid()
- )
- : null;
- return new PathChildrenCacheEventEntity
- (
- event.getType().name(),
- path,
- data,
- stat
- );
+ PathChildrenCacheDataEntity data = null;
+ ChildData childData = event.getData();
+ if ( childData != null )
+ {
+ data = toEntity(childData);
+ }
+ return new PathChildrenCacheEventEntity(event.getType().name(), data);
}
};
+ private static PathChildrenCacheDataEntity toEntity(ChildData childData)
+ {
+ PathChildrenCacheDataEntity data;StatEntity stat = (childData.getStat() != null)
+ ? new StatEntity
+ (
+ childData.getStat().getCzxid(),
+ childData.getStat().getMzxid(),
+ childData.getStat().getCtime(),
+ childData.getStat().getMtime(),
+ childData.getStat().getVersion(),
+ childData.getStat().getCversion(),
+ childData.getStat().getAversion(),
+ childData.getStat().getEphemeralOwner(),
+ childData.getStat().getDataLength(),
+ childData.getStat().getNumChildren(),
+ childData.getStat().getPzxid()
+ )
+ : null;
+ data = new PathChildrenCacheDataEntity
+ (
+ childData.getPath(),
+ (childData.getData() != null) ? new String((childData.getData())) : null,
+ stat
+ );
+ return data;
+ }
+
private static class LocalListener implements PathChildrenCacheListener
{
private final PathChildrenCacheThing cacheThing;
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
new file mode 100644
index 0000000..7b302ff
--- /dev/null
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheDataEntity.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.x.rest.entity;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class PathChildrenCacheDataEntity
+{
+ private String path;
+ private String data;
+ private StatEntity stat;
+
+ public PathChildrenCacheDataEntity()
+ {
+ this("", "", new StatEntity());
+ }
+
+ public PathChildrenCacheDataEntity(String path, String data, StatEntity stat)
+ {
+ this.path = path;
+ this.data = data;
+ this.stat = stat;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+ public void setData(String data)
+ {
+ this.data = data;
+ }
+
+ public StatEntity getStat()
+ {
+ return stat;
+ }
+
+ public void setStat(StatEntity stat)
+ {
+ this.stat = stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/2fa263e9/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
----------------------------------------------------------------------
diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
index 9b08d32..a730d48 100644
--- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
+++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/PathChildrenCacheEventEntity.java
@@ -25,21 +25,17 @@ import javax.xml.bind.annotation.XmlRootElement;
public class PathChildrenCacheEventEntity
{
private String eventType;
- private String path;
- private String data;
- private StatEntity stat;
+ private PathChildrenCacheDataEntity data;
public PathChildrenCacheEventEntity()
{
- this("", "", "", new StatEntity());
+ this("", new PathChildrenCacheDataEntity());
}
- public PathChildrenCacheEventEntity(String eventType, String path, String data, StatEntity stat)
+ public PathChildrenCacheEventEntity(String eventType, PathChildrenCacheDataEntity data)
{
this.eventType = eventType;
- this.path = path;
this.data = data;
- this.stat = stat;
}
public String getEventType()
@@ -52,33 +48,13 @@ public class PathChildrenCacheEventEntity
this.eventType = eventType;
}
- public String getPath()
- {
- return path;
- }
-
- public void setPath(String path)
- {
- this.path = path;
- }
-
- public String getData()
+ public PathChildrenCacheDataEntity getData()
{
return data;
}
- public void setData(String data)
+ public void setData(PathChildrenCacheDataEntity data)
{
this.data = data;
}
-
- public StatEntity getStat()
- {
- return stat;
- }
-
- public void setStat(StatEntity stat)
- {
- this.stat = stat;
- }
}
[06/10] git commit: wip
Posted by ra...@apache.org.
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f68a785c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f68a785c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f68a785c
Branch: refs/heads/websockets
Commit: f68a785c9e6b6fb90ee586716dd9e8a3dade92ff
Parents: 0910d48
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 09:50:52 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 09:50:52 2014 -0500
----------------------------------------------------------------------
.../curator/x/websockets/api/ApiCommand.java | 2 +-
.../curator/x/websockets/api/JsonUtils.java | 25 ++++++++++
.../x/websockets/api/zookeeper/Create.java | 51 ++++++++++++++++++--
.../x/websockets/details/CuratorEndpoint.java | 43 +++++++++++++----
4 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
index 438e1a6..0e67395 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -26,5 +26,5 @@ import org.codehaus.jackson.map.ObjectWriter;
public interface ApiCommand
{
- public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+ public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
index 8c18d7c..bf659ad 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -20,9 +20,34 @@
package org.apache.curator.x.websockets.api;
import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
+import java.io.IOException;
+import java.util.UUID;
public class JsonUtils
{
+ public static final String FIELD_TYPE = "type";
+ public static final String FIELD_ID = "id";
+ public static final String FIELD_VALUE = "value";
+
+ public static final String SYSTEM_TYPE_CONNECTION_STATE_CHANGE = "system/connection-state-change";
+
+ public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, ObjectNode value) throws IOException
+ {
+ return newMessage(mapper, writer, type, UUID.randomUUID().toString(), value);
+ }
+
+ public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, String id, ObjectNode value) throws IOException
+ {
+ ObjectNode node = mapper.createObjectNode();
+ node.put(FIELD_TYPE, type);
+ node.put(FIELD_ID, id);
+ node.put(FIELD_VALUE, value);
+ return writer.writeValueAsString(node);
+ }
+
public static String getRequiredString(JsonNode node, String name) throws Exception
{
JsonNode jsonNode = node.get(name);
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 3cd148d..24cb076 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,14 @@
package org.apache.curator.x.websockets.api.zookeeper;
+import org.apache.curator.framework.api.Compressible;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.x.websockets.api.ApiCommand;
import org.apache.curator.x.websockets.api.JsonUtils;
import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
@@ -29,18 +34,54 @@ import org.codehaus.jackson.map.ObjectWriter;
public class Create implements ApiCommand
{
@Override
- public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+ public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
{
String path = JsonUtils.getRequiredString(input, "path");
boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
- boolean async = JsonUtils.getOptionalBoolean(input, "async");
- String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+ String mode = JsonUtils.getOptionalString(input, "mode");
JsonNode payload = input.get("payload");
- // TODO ACL
- return null;
+ Object builder = session.getClient().create();
+ Object result;
+ try
+ {
+ if ( withProtection )
+ {
+ builder = ((CreateBuilder)builder).withProtection();
+ }
+ if ( creatingParentsIfNeeded )
+ {
+ builder = ((CreateBuilder)builder).creatingParentsIfNeeded();
+ }
+ if ( compressed )
+ {
+ builder = ((Compressible)builder).compressed();
+ }
+
+ if ( mode != null )
+ {
+ CreateMode createMode = CreateMode.valueOf(mode.toUpperCase());
+ builder = ((CreateModable)builder).withMode(createMode);
+ }
+
+ if ( payload != null )
+ {
+ String payloadStr = writer.writeValueAsString(payload);
+ result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+ }
+ else
+ {
+ result = ((PathAndBytesable)builder).forPath(path);
+ }
+ }
+ catch ( ClassCastException e )
+ {
+ throw new Exception("Bad combination of arguments to create()");
+ }
+
+ // TODO ACL, result
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 576f475..6f664de 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -20,11 +20,15 @@
package org.apache.curator.x.websockets.details;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
@@ -35,8 +39,9 @@ import java.io.IOException;
public class CuratorEndpoint extends Endpoint
{
private final SessionManager sessionManager;
- private final ObjectReader reader = new ObjectMapper().reader();
- private final ObjectWriter writer = new ObjectMapper().writer();
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final ObjectReader reader = mapper.reader();
+ private final ObjectWriter writer = mapper.writer();
public CuratorEndpoint(SessionManager sessionManager)
{
@@ -51,6 +56,26 @@ public class CuratorEndpoint extends Endpoint
CuratorFramework client = sessionManager.getClientCreator().newClient();
sessionManager.put(session, new CuratorWebsocketsSession(client, session));
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ try
+ {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("newState", newState.name());
+ String message = JsonUtils.newMessage(mapper, writer, JsonUtils.SYSTEM_TYPE_CONNECTION_STATE_CHANGE, node);
+ session.getAsyncRemote().sendText(message);
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+
client.start();
}
catch ( Exception e )
@@ -90,14 +115,12 @@ public class CuratorEndpoint extends Endpoint
}
JsonNode jsonNode = reader.readTree(message);
- JsonNode command = jsonNode.get("command");
- if ( command == null )
- {
- throw new Exception("Missing field: \"command\"");
- }
- String commandName = command.asText();
- ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
- apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+ String command = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_TYPE);
+ String id = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_ID);
+ JsonNode value = jsonNode.get(JsonUtils.FIELD_VALUE);
+
+ ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(command);
+ apiCommand.process(id, value, curatorWebsocketsSession, reader, writer);
}
catch ( Exception e )
{