You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by fm...@apache.org on 2011/07/09 07:17:20 UTC

svn commit: r1144596 - /tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/

Author: fmoga
Date: Sat Jul  9 05:17:19 2011
New Revision: 1144596

URL: http://svn.apache.org/viewvc?rev=1144596&view=rev
Log:
Add websocket message multiplexing via persistent connections.

Added:
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
Removed:
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java
Modified:
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
    tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java

Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java Sat Jul  9 05:17:19 2011
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
 package org.apache.tuscany.sca.binding.websocket.runtime;
 
 import java.util.ArrayList;
@@ -111,17 +129,20 @@ public class JSONUtil {
         return builder.toString();
     }
 
-    /**
-     * Decode JSON to a given Java type.
-     * 
-     * @param responseJSON
-     *            the json to convert
-     * @param returnType
-     *            the return type to convert to
-     * @return the converted object
-     */
-    public static Object decodeResponse(String responseJSON, Class<?> returnType) {
-        return gson.fromJson(responseJSON, returnType);
+    public static String encodeRequest(WebSocketBindingRequest request) {
+        return gson.toJson(request);
+    }
+
+    public static WebSocketBindingRequest decodeRequest(String jsonRequest) {
+        return gson.fromJson(jsonRequest, WebSocketBindingRequest.class);
+    }
+
+    public static WebSocketBindingResponse decodeResponse(String operationResponse) {
+        return gson.fromJson(operationResponse, WebSocketBindingResponse.class);
+    }
+
+    public static Object decodeResponsePayload(String payload, Class<?> returnType) {
+        return gson.fromJson(payload, returnType);
     }
 
 }

Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java Sat Jul  9 05:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingRequest {
+
+    private String requestId;
+    private String uri;
+    private String payload;
+
+    public WebSocketBindingRequest(String requestId, String uri, String payload) {
+        this.requestId = requestId;
+        this.uri = uri;
+        this.payload = payload;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+
+    public void setPayload(String payload) {
+        this.payload = payload;
+    }
+
+}

Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java Sat Jul  9 05:17:19 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingResponse {
+
+    private String uri;
+    private String payload;
+
+    public WebSocketBindingResponse(String uri, String payload) {
+        this.uri = uri;
+        this.payload = payload;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getPayload() {
+        return payload;
+    }
+
+    public void setPayload(String payload) {
+        this.payload = payload;
+    }
+
+}

Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java Sat Jul  9 05:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.ServerWebSocket;
+import org.apache.websocket.WebSocket;
+import org.apache.websocket.WebSocketApplication;
+import org.apache.websocket.WebSocketException;
+
+public class WebSocketOperationDispatcher implements WebSocketApplication<SocketChannel> {
+
+    private ServerWebSocket server;
+    private Map<String, RuntimeEndpoint> endpoints = new HashMap<String, RuntimeEndpoint>();
+    private Map<String, Operation> operations = new HashMap<String, Operation>();
+
+    public WebSocketOperationDispatcher(ServerWebSocket server) {
+        this.server = server;
+    }
+
+    public void addOperation(String uri, RuntimeEndpoint endpoint, Operation operation) {
+        endpoints.put(uri, endpoint);
+        operations.put(uri, operation);
+    }
+
+    public Operation getOperation(String uri) {
+        return operations.get(uri);
+    }
+
+    public RuntimeEndpoint getEndpoint(String uri) {
+        return endpoints.get(uri);
+    }
+
+    @Override
+    public void onConnection(WebSocket<SocketChannel> socket) {
+        // release server thread
+        new Thread(new WebSocketRequestHandler(socket, this)).start();
+    }
+
+    @Override
+    public void onHandshakeError(IOException e) {
+        throw new RuntimeException(e);
+    }
+
+    @Override
+    public String acceptProtocol(String protocol) {
+        // don't accept any subprotocols
+        return null;
+    }
+
+    @Override
+    public boolean acceptOrigin(String origin) {
+        // accept all clients
+        return true;
+    }
+
+    @Override
+    public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
+        // don't accept any extensions
+        return null;
+    }
+
+    public void shutdown() {
+        try {
+            server.close();
+            endpoints.clear();
+            operations.clear();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
\ No newline at end of file

Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java Sat Jul  9 05:17:19 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.WebSocket;
+
+public class WebSocketRequestHandler implements Runnable {
+
+    private WebSocket<SocketChannel> websocket;
+    private WebSocketOperationDispatcher dispatcher;
+
+    public WebSocketRequestHandler(WebSocket<SocketChannel> socket, WebSocketOperationDispatcher dispatcher) {
+        this.websocket = socket;
+        this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                // TODO use Java NIO selectors on websockets
+                String request = websocket.receiveText();
+                String response = handleRequest(request);
+                websocket.sendText(response);
+            } catch (IOException e) {
+                if (!websocket.isOpen()) {
+                    System.out.println("Client disconnected. Stopping WebSocketRequestHandler.");
+                    break;
+                } else {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    // TODO handle request asynchronously in a background thread
+    private String handleRequest(String jsonRequest) {
+        WebSocketBindingRequest request = JSONUtil.decodeRequest(jsonRequest);
+        RuntimeEndpoint wire = dispatcher.getEndpoint(request.getUri());
+        Operation operation = dispatcher.getOperation(request.getUri());
+        System.out.println("handleRequest - " + request.getUri() + " - " + wire + " - " + operation);
+        String jsonParams = request.getPayload();
+        Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation);
+        try {
+            Object operationResponse = wire.invoke(operation, args);
+            String payload = JSONUtil.encodeResponse(operationResponse);
+            WebSocketBindingResponse response = new WebSocketBindingResponse(request.getRequestId(), payload);
+            return JSONUtil.encodeResponse(response);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java Sat Jul  9 05:17:19 2011
@@ -41,6 +41,7 @@ public class WebsocketReferenceBindingPr
     }
 
     public void stop() {
+        WebsocketReferenceInvoker.shutdown();
     }
 
     public InterfaceContract getBindingInterfaceContract() {

Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java Sat Jul  9 05:17:19 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.tuscany.sca.assembly.EndpointReference;
 import org.apache.tuscany.sca.interfacedef.Operation;
@@ -33,6 +36,9 @@ import org.apache.websocket.WebSocketCon
 
 public class WebsocketReferenceInvoker implements Invoker {
 
+    // TODO add timeout mechanism for persistent connections
+    private static ConcurrentMap<String, WebSocket<SocketChannel>> persistentWebsockets = new ConcurrentHashMap<String, WebSocket<SocketChannel>>();
+
     protected Operation operation;
     protected EndpointReference endpoint;
 
@@ -43,41 +49,57 @@ public class WebsocketReferenceInvoker i
 
     public Message invoke(Message msg) {
         try {
-            return doInvoke(msg);
+            WebSocket<SocketChannel> websocket = initWebsocketConnection(endpoint.getBinding().getURI());
+            return doInvoke(msg, websocket);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    public Message doInvoke(Message msg) {
+    private WebSocket<SocketChannel> initWebsocketConnection(String uri) throws IOException, URISyntaxException {
+        WebSocket<SocketChannel> websocket = null;
+        synchronized (persistentWebsockets) {
+            websocket = persistentWebsockets.get(uri);
+            if (websocket == null) {
+                WebSocketConnector connector = new WebSocketConnector();
+                websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
+                persistentWebsockets.put(uri, websocket);
+            }
+        }
+        return websocket;
+    }
+
+    public Message doInvoke(Message msg, WebSocket<SocketChannel> websocket) throws IOException {
         String componentName = endpoint.getTargetEndpoint().getComponent().getName();
         String serviceName = endpoint.getTargetEndpoint().getService().getName();
         String operationName = operation.getName();
-        String uri = endpoint.getBinding().getURI() + "/" + componentName + "/" + serviceName + "/" + operationName;
-        String jsonParams = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
-        String responseJSON = invokeWebSocketRequest(uri, jsonParams);
+        String uri = componentName + "/" + serviceName + "/" + operationName;
+        String payload = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
+        WebSocketBindingRequest request = new WebSocketBindingRequest(UUID.randomUUID().toString(), uri, payload);
+
+        String operationResponse = invokeViaWebsocket(websocket, JSONUtil.encodeRequest(request));
+
+        WebSocketBindingResponse response = JSONUtil.decodeResponse(operationResponse);
         Class<?> returnType = operation.getOutputType().getLogical().get(0).getPhysical();
-        Object response = JSONUtil.decodeResponse(responseJSON, returnType);
-        msg.setBody(response);
+        Object invocationResponse = JSONUtil.decodeResponsePayload(response.getPayload(), returnType);
+        msg.setBody(invocationResponse);
         return msg;
     }
 
-    private String invokeWebSocketRequest(String uri, String jsonParams) {
-        try {
-            return doInvokeWebSocketRequest(uri, jsonParams);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } catch (URISyntaxException e) {
-            throw new RuntimeException(e);
-        }
+    private String invokeViaWebsocket(WebSocket<SocketChannel> websocket, String request) throws IOException {
+        websocket.sendText(request);
+        return websocket.receiveText();
     }
 
-    private String doInvokeWebSocketRequest(String uri, String jsonParams) throws IOException, URISyntaxException {
-        WebSocketConnector connector = new WebSocketConnector();
-        WebSocket<SocketChannel> websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
-        websocket.sendText(jsonParams);
-        String jsonResponse = websocket.receiveText();
-        websocket.close();
-        return jsonResponse;
+    public static void shutdown() {
+        for (WebSocket<SocketChannel> websocket : persistentWebsockets.values()) {
+            try {
+                websocket.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        persistentWebsockets.clear();
     }
+
 }

Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java Sat Jul  9 05:17:19 2011
@@ -22,24 +22,20 @@ package org.apache.tuscany.sca.binding.w
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.tuscany.sca.interfacedef.InterfaceContract;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.provider.ServiceBindingProvider;
 import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
 import org.apache.websocket.ServerWebSocket;
-import org.apache.websocket.WebSocket;
-import org.apache.websocket.WebSocketApplication;
-import org.apache.websocket.WebSocketException;
 
 public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
 
+    private static Map<String, WebSocketOperationDispatcher> dispatchers = new HashMap<String, WebSocketOperationDispatcher>();
+
     private RuntimeEndpoint endpoint;
-    private static ConcurrentMap<String, ServerWebSocket> websocketServers = new ConcurrentHashMap<String, ServerWebSocket>();
 
     public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) {
         this.endpoint = endpoint;
@@ -47,24 +43,26 @@ public class WebsocketServiceBindingProv
 
     public void start() {
         String uri = endpoint.getBinding().getURI();
-        ServerWebSocket server = initWebSocketServerForURI(uri);
+        WebSocketOperationDispatcher dispatcher = initDispatcherForURI(uri);
         String component = endpoint.getComponent().getName();
         String service = endpoint.getService().getName();
         for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
             String operation = op.getName();
-            server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op));
-            System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation);
+            dispatcher.addOperation(component + "/" + service + "/" + operation, endpoint, op);
         }
 
     }
 
-    private ServerWebSocket initWebSocketServerForURI(String uri) {
-        ServerWebSocket server = websocketServers.get(uri);
-        if (server == null) {
+    private WebSocketOperationDispatcher initDispatcherForURI(String uri) {
+        WebSocketOperationDispatcher dispatcher = dispatchers.get(uri);
+        if (dispatcher == null) {
             try {
-                server = new ServerWebSocket(new URI(uri));
-                websocketServers.put(uri, server);
-                System.out.println("Starting websocket server at " + uri + "...");
+                ServerWebSocket server = new ServerWebSocket(new URI(uri));
+                System.out.println("Starting websocket server " + server + " at " + uri + "...");
+                dispatcher = new WebSocketOperationDispatcher(server);
+                System.out.println("Created new dispatcher for " + uri + " " + dispatcher);
+                dispatchers.put(uri, dispatcher);
+                server.register("/", dispatcher);
                 new Thread(server).start();
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -72,20 +70,14 @@ public class WebsocketServiceBindingProv
                 throw new RuntimeException(e);
             }
         }
-        return server;
+        return dispatcher;
     }
 
     public void stop() {
-        if (websocketServers != null) {
-            for (ServerWebSocket server : websocketServers.values()) {
-                try {
-                    server.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-            websocketServers.clear();
+        for (WebSocketOperationDispatcher dispatcher : dispatchers.values()) {
+            dispatcher.shutdown();
         }
+        dispatchers.clear();
     }
 
     public InterfaceContract getBindingInterfaceContract() {
@@ -96,47 +88,4 @@ public class WebsocketServiceBindingProv
         return false;
     }
 
-    public class WebSocketEndpoint implements WebSocketApplication<SocketChannel> {
-
-        private RuntimeEndpoint endpoint;
-        private Operation operation;
-
-        public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) {
-            this.endpoint = endpoint;
-            this.operation = operation;
-        }
-
-        @Override
-        public void onConnection(WebSocket<SocketChannel> socket) {
-            // handle request in a non-blocking fashion releasing the server
-            // thread
-            new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start();
-        }
-
-        @Override
-        public void onHandshakeError(IOException e) {
-            System.out.println("Handshake error!\n");
-            e.printStackTrace();
-        }
-
-        @Override
-        public String acceptProtocol(String protocol) {
-            // don't accept any subprotocols
-            return null;
-        }
-
-        @Override
-        public boolean acceptOrigin(String origin) {
-            // accept all clients
-            return true;
-        }
-
-        @Override
-        public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
-            // don't accept any extensions
-            return null;
-        }
-
-    }
-
 }