You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/22 14:35:21 UTC
[06/10] git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f68a785c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f68a785c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f68a785c
Branch: refs/heads/websockets
Commit: f68a785c9e6b6fb90ee586716dd9e8a3dade92ff
Parents: 0910d48
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 09:50:52 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 09:50:52 2014 -0500
----------------------------------------------------------------------
.../curator/x/websockets/api/ApiCommand.java | 2 +-
.../curator/x/websockets/api/JsonUtils.java | 25 ++++++++++
.../x/websockets/api/zookeeper/Create.java | 51 ++++++++++++++++++--
.../x/websockets/details/CuratorEndpoint.java | 43 +++++++++++++----
4 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
index 438e1a6..0e67395 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -26,5 +26,5 @@ import org.codehaus.jackson.map.ObjectWriter;
public interface ApiCommand
{
- public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+ public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
index 8c18d7c..bf659ad 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -20,9 +20,34 @@
package org.apache.curator.x.websockets.api;
import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
+import java.io.IOException;
+import java.util.UUID;
public class JsonUtils
{
+ public static final String FIELD_TYPE = "type";
+ public static final String FIELD_ID = "id";
+ public static final String FIELD_VALUE = "value";
+
+ public static final String SYSTEM_TYPE_CONNECTION_STATE_CHANGE = "system/connection-state-change";
+
+ public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, ObjectNode value) throws IOException
+ {
+ return newMessage(mapper, writer, type, UUID.randomUUID().toString(), value);
+ }
+
+ public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, String id, ObjectNode value) throws IOException
+ {
+ ObjectNode node = mapper.createObjectNode();
+ node.put(FIELD_TYPE, type);
+ node.put(FIELD_ID, id);
+ node.put(FIELD_VALUE, value);
+ return writer.writeValueAsString(node);
+ }
+
public static String getRequiredString(JsonNode node, String name) throws Exception
{
JsonNode jsonNode = node.get(name);
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 3cd148d..24cb076 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,14 @@
package org.apache.curator.x.websockets.api.zookeeper;
+import org.apache.curator.framework.api.Compressible;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.x.websockets.api.ApiCommand;
import org.apache.curator.x.websockets.api.JsonUtils;
import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
@@ -29,18 +34,54 @@ import org.codehaus.jackson.map.ObjectWriter;
public class Create implements ApiCommand
{
@Override
- public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+ public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
{
String path = JsonUtils.getRequiredString(input, "path");
boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
- boolean async = JsonUtils.getOptionalBoolean(input, "async");
- String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+ String mode = JsonUtils.getOptionalString(input, "mode");
JsonNode payload = input.get("payload");
- // TODO ACL
- return null;
+ Object builder = session.getClient().create();
+ Object result;
+ try
+ {
+ if ( withProtection )
+ {
+ builder = ((CreateBuilder)builder).withProtection();
+ }
+ if ( creatingParentsIfNeeded )
+ {
+ builder = ((CreateBuilder)builder).creatingParentsIfNeeded();
+ }
+ if ( compressed )
+ {
+ builder = ((Compressible)builder).compressed();
+ }
+
+ if ( mode != null )
+ {
+ CreateMode createMode = CreateMode.valueOf(mode.toUpperCase());
+ builder = ((CreateModable)builder).withMode(createMode);
+ }
+
+ if ( payload != null )
+ {
+ String payloadStr = writer.writeValueAsString(payload);
+ result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+ }
+ else
+ {
+ result = ((PathAndBytesable)builder).forPath(path);
+ }
+ }
+ catch ( ClassCastException e )
+ {
+ throw new Exception("Bad combination of arguments to create()");
+ }
+
+ // TODO ACL, result
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 576f475..6f664de 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -20,11 +20,15 @@
package org.apache.curator.x.websockets.details;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
@@ -35,8 +39,9 @@ import java.io.IOException;
public class CuratorEndpoint extends Endpoint
{
private final SessionManager sessionManager;
- private final ObjectReader reader = new ObjectMapper().reader();
- private final ObjectWriter writer = new ObjectMapper().writer();
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final ObjectReader reader = mapper.reader();
+ private final ObjectWriter writer = mapper.writer();
public CuratorEndpoint(SessionManager sessionManager)
{
@@ -51,6 +56,26 @@ public class CuratorEndpoint extends Endpoint
CuratorFramework client = sessionManager.getClientCreator().newClient();
sessionManager.put(session, new CuratorWebsocketsSession(client, session));
+ ConnectionStateListener listener = new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ try
+ {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("newState", newState.name());
+ String message = JsonUtils.newMessage(mapper, writer, JsonUtils.SYSTEM_TYPE_CONNECTION_STATE_CHANGE, node);
+ session.getAsyncRemote().sendText(message);
+ }
+ catch ( Exception e )
+ {
+ // TODO
+ }
+ }
+ };
+ client.getConnectionStateListenable().addListener(listener);
+
client.start();
}
catch ( Exception e )
@@ -90,14 +115,12 @@ public class CuratorEndpoint extends Endpoint
}
JsonNode jsonNode = reader.readTree(message);
- JsonNode command = jsonNode.get("command");
- if ( command == null )
- {
- throw new Exception("Missing field: \"command\"");
- }
- String commandName = command.asText();
- ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
- apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+ String command = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_TYPE);
+ String id = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_ID);
+ JsonNode value = jsonNode.get(JsonUtils.FIELD_VALUE);
+
+ ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(command);
+ apiCommand.process(id, value, curatorWebsocketsSession, reader, writer);
}
catch ( Exception e )
{