You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/03/22 14:35:21 UTC

[06/10] git commit: wip

wip


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f68a785c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f68a785c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f68a785c

Branch: refs/heads/websockets
Commit: f68a785c9e6b6fb90ee586716dd9e8a3dade92ff
Parents: 0910d48
Author: randgalt <ra...@apache.org>
Authored: Sat Jan 11 09:50:52 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jan 11 09:50:52 2014 -0500

----------------------------------------------------------------------
 .../curator/x/websockets/api/ApiCommand.java    |  2 +-
 .../curator/x/websockets/api/JsonUtils.java     | 25 ++++++++++
 .../x/websockets/api/zookeeper/Create.java      | 51 ++++++++++++++++++--
 .../x/websockets/details/CuratorEndpoint.java   | 43 +++++++++++++----
 4 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
index 438e1a6..0e67395 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/ApiCommand.java
@@ -26,5 +26,5 @@ import org.codehaus.jackson.map.ObjectWriter;
 
 public interface ApiCommand
 {
-    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
+    public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
index 8c18d7c..bf659ad 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/JsonUtils.java
@@ -20,9 +20,34 @@
 package org.apache.curator.x.websockets.api;
 
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
+import java.io.IOException;
+import java.util.UUID;
 
 public class JsonUtils
 {
+    public static final String FIELD_TYPE = "type";
+    public static final String FIELD_ID = "id";
+    public static final String FIELD_VALUE = "value";
+
+    public static final String SYSTEM_TYPE_CONNECTION_STATE_CHANGE = "system/connection-state-change";
+
+    public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, ObjectNode value) throws IOException
+    {
+        return newMessage(mapper, writer, type, UUID.randomUUID().toString(), value);
+    }
+
+    public static String newMessage(ObjectMapper mapper, ObjectWriter writer, String type, String id, ObjectNode value) throws IOException
+    {
+        ObjectNode node = mapper.createObjectNode();
+        node.put(FIELD_TYPE, type);
+        node.put(FIELD_ID, id);
+        node.put(FIELD_VALUE, value);
+        return writer.writeValueAsString(node);
+    }
+
     public static String getRequiredString(JsonNode node, String name) throws Exception
     {
         JsonNode jsonNode = node.get(name);

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
index 3cd148d..24cb076 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/api/zookeeper/Create.java
@@ -19,9 +19,14 @@
 
 package org.apache.curator.x.websockets.api.zookeeper;
 
+import org.apache.curator.framework.api.Compressible;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.CreateModable;
+import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.x.websockets.api.ApiCommand;
 import org.apache.curator.x.websockets.api.JsonUtils;
 import org.apache.curator.x.websockets.details.CuratorWebsocketsSession;
+import org.apache.zookeeper.CreateMode;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectReader;
 import org.codehaus.jackson.map.ObjectWriter;
@@ -29,18 +34,54 @@ import org.codehaus.jackson.map.ObjectWriter;
 public class Create implements ApiCommand
 {
     @Override
-    public String process(JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
+    public void process(String id, JsonNode input, CuratorWebsocketsSession session, ObjectReader reader, ObjectWriter writer) throws Exception
     {
         String path = JsonUtils.getRequiredString(input, "path");
         boolean withProtection = JsonUtils.getOptionalBoolean(input, "withProtection");
         boolean creatingParentsIfNeeded = JsonUtils.getOptionalBoolean(input, "creatingParentsIfNeeded");
         boolean compressed = JsonUtils.getOptionalBoolean(input, "compressed");
-        boolean async = JsonUtils.getOptionalBoolean(input, "async");
-        String mode = JsonUtils.getOptionalString(input, "mode", "persistent");
+        String mode = JsonUtils.getOptionalString(input, "mode");
 
         JsonNode payload = input.get("payload");
 
-        // TODO ACL
-        return null;
+        Object builder = session.getClient().create();
+        Object result;
+        try
+        {
+            if ( withProtection )
+            {
+                builder = ((CreateBuilder)builder).withProtection();
+            }
+            if ( creatingParentsIfNeeded )
+            {
+                builder = ((CreateBuilder)builder).creatingParentsIfNeeded();
+            }
+            if ( compressed )
+            {
+                builder = ((Compressible)builder).compressed();
+            }
+
+            if ( mode != null )
+            {
+                CreateMode createMode = CreateMode.valueOf(mode.toUpperCase());
+                builder = ((CreateModable)builder).withMode(createMode);
+            }
+
+            if ( payload != null )
+            {
+                String payloadStr = writer.writeValueAsString(payload);
+                result = ((PathAndBytesable)builder).forPath(path, payloadStr.getBytes());
+            }
+            else
+            {
+                result = ((PathAndBytesable)builder).forPath(path);
+            }
+        }
+        catch ( ClassCastException e )
+        {
+            throw new Exception("Bad combination of arguments to create()");
+        }
+
+        // TODO ACL, result
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/f68a785c/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
----------------------------------------------------------------------
diff --git a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
index 576f475..6f664de 100644
--- a/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
+++ b/curator-x-websockets/src/main/java/org/apache/curator/x/websockets/details/CuratorEndpoint.java
@@ -20,11 +20,15 @@
 package org.apache.curator.x.websockets.details;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.x.websockets.api.ApiCommand;
+import org.apache.curator.x.websockets.api.JsonUtils;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.node.ObjectNode;
 import javax.websocket.CloseReason;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
@@ -35,8 +39,9 @@ import java.io.IOException;
 public class CuratorEndpoint extends Endpoint
 {
     private final SessionManager sessionManager;
-    private final ObjectReader reader = new ObjectMapper().reader();
-    private final ObjectWriter writer = new ObjectMapper().writer();
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final ObjectReader reader = mapper.reader();
+    private final ObjectWriter writer = mapper.writer();
 
     public CuratorEndpoint(SessionManager sessionManager)
     {
@@ -51,6 +56,26 @@ public class CuratorEndpoint extends Endpoint
             CuratorFramework client = sessionManager.getClientCreator().newClient();
             sessionManager.put(session, new CuratorWebsocketsSession(client, session));
 
+            ConnectionStateListener listener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    try
+                    {
+                        ObjectNode node = mapper.createObjectNode();
+                        node.put("newState", newState.name());
+                        String message = JsonUtils.newMessage(mapper, writer, JsonUtils.SYSTEM_TYPE_CONNECTION_STATE_CHANGE, node);
+                        session.getAsyncRemote().sendText(message);
+                    }
+                    catch ( Exception e )
+                    {
+                        // TODO
+                    }
+                }
+            };
+            client.getConnectionStateListenable().addListener(listener);
+
             client.start();
         }
         catch ( Exception e )
@@ -90,14 +115,12 @@ public class CuratorEndpoint extends Endpoint
             }
 
             JsonNode jsonNode = reader.readTree(message);
-            JsonNode command = jsonNode.get("command");
-            if ( command == null )
-            {
-                throw new Exception("Missing field: \"command\"");
-            }
-            String commandName = command.asText();
-            ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(commandName);
-            apiCommand.process(jsonNode, curatorWebsocketsSession, reader, writer);
+            String command = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_TYPE);
+            String id = JsonUtils.getRequiredString(jsonNode, JsonUtils.FIELD_ID);
+            JsonNode value = jsonNode.get(JsonUtils.FIELD_VALUE);
+
+            ApiCommand apiCommand = sessionManager.getCommandManager().newCommand(command);
+            apiCommand.process(id, value, curatorWebsocketsSession, reader, writer);
         }
         catch ( Exception e )
         {