You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by fm...@apache.org on 2011/07/09 07:17:20 UTC
svn commit: r1144596 -
/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/
Author: fmoga
Date: Sat Jul 9 05:17:19 2011
New Revision: 1144596
URL: http://svn.apache.org/viewvc?rev=1144596&view=rev
Log:
Add websocket message multiplexing via persistent connections.
Added:
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
Removed:
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java
Modified:
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java Sat Jul 9 05:17:19 2011
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.tuscany.sca.binding.websocket.runtime;
import java.util.ArrayList;
@@ -111,17 +129,20 @@ public class JSONUtil {
return builder.toString();
}
- /**
- * Decode JSON to a given Java type.
- *
- * @param responseJSON
- * the json to convert
- * @param returnType
- * the return type to convert to
- * @return the converted object
- */
- public static Object decodeResponse(String responseJSON, Class<?> returnType) {
- return gson.fromJson(responseJSON, returnType);
+ public static String encodeRequest(WebSocketBindingRequest request) {
+ return gson.toJson(request);
+ }
+
+ public static WebSocketBindingRequest decodeRequest(String jsonRequest) {
+ return gson.fromJson(jsonRequest, WebSocketBindingRequest.class);
+ }
+
+ public static WebSocketBindingResponse decodeResponse(String operationResponse) {
+ return gson.fromJson(operationResponse, WebSocketBindingResponse.class);
+ }
+
+ public static Object decodeResponsePayload(String payload, Class<?> returnType) {
+ return gson.fromJson(payload, returnType);
}
}
Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java Sat Jul 9 05:17:19 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingRequest {
+
+ private String requestId;
+ private String uri;
+ private String payload;
+
+ public WebSocketBindingRequest(String requestId, String uri, String payload) {
+ this.requestId = requestId;
+ this.uri = uri;
+ this.payload = payload;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+}
Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java Sat Jul 9 05:17:19 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingResponse {
+
+ private String uri;
+ private String payload;
+
+ public WebSocketBindingResponse(String uri, String payload) {
+ this.uri = uri;
+ this.payload = payload;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+}
Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java Sat Jul 9 05:17:19 2011
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.ServerWebSocket;
+import org.apache.websocket.WebSocket;
+import org.apache.websocket.WebSocketApplication;
+import org.apache.websocket.WebSocketException;
+
+public class WebSocketOperationDispatcher implements WebSocketApplication<SocketChannel> {
+
+ private ServerWebSocket server;
+ private Map<String, RuntimeEndpoint> endpoints = new HashMap<String, RuntimeEndpoint>();
+ private Map<String, Operation> operations = new HashMap<String, Operation>();
+
+ public WebSocketOperationDispatcher(ServerWebSocket server) {
+ this.server = server;
+ }
+
+ public void addOperation(String uri, RuntimeEndpoint endpoint, Operation operation) {
+ endpoints.put(uri, endpoint);
+ operations.put(uri, operation);
+ }
+
+ public Operation getOperation(String uri) {
+ return operations.get(uri);
+ }
+
+ public RuntimeEndpoint getEndpoint(String uri) {
+ return endpoints.get(uri);
+ }
+
+ @Override
+ public void onConnection(WebSocket<SocketChannel> socket) {
+ // release server thread
+ new Thread(new WebSocketRequestHandler(socket, this)).start();
+ }
+
+ @Override
+ public void onHandshakeError(IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ @Override
+ public String acceptProtocol(String protocol) {
+ // don't accept any subprotocols
+ return null;
+ }
+
+ @Override
+ public boolean acceptOrigin(String origin) {
+ // accept all clients
+ return true;
+ }
+
+ @Override
+ public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
+ // don't accept any extensions
+ return null;
+ }
+
+ public void shutdown() {
+ try {
+ server.close();
+ endpoints.clear();
+ operations.clear();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
Added: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java?rev=1144596&view=auto
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java (added)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java Sat Jul 9 05:17:19 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.WebSocket;
+
+public class WebSocketRequestHandler implements Runnable {
+
+ private WebSocket<SocketChannel> websocket;
+ private WebSocketOperationDispatcher dispatcher;
+
+ public WebSocketRequestHandler(WebSocket<SocketChannel> socket, WebSocketOperationDispatcher dispatcher) {
+ this.websocket = socket;
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // TODO use Java NIO selectors on websockets
+ String request = websocket.receiveText();
+ String response = handleRequest(request);
+ websocket.sendText(response);
+ } catch (IOException e) {
+ if (!websocket.isOpen()) {
+ System.out.println("Client disconnected. Stopping WebSocketRequestHandler.");
+ break;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ // TODO handle request asynchronously in a background thread
+ private String handleRequest(String jsonRequest) {
+ WebSocketBindingRequest request = JSONUtil.decodeRequest(jsonRequest);
+ RuntimeEndpoint wire = dispatcher.getEndpoint(request.getUri());
+ Operation operation = dispatcher.getOperation(request.getUri());
+ System.out.println("handleRequest - " + request.getUri() + " - " + wire + " - " + operation);
+ String jsonParams = request.getPayload();
+ Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation);
+ try {
+ Object operationResponse = wire.invoke(operation, args);
+ String payload = JSONUtil.encodeResponse(operationResponse);
+ WebSocketBindingResponse response = new WebSocketBindingResponse(request.getRequestId(), payload);
+ return JSONUtil.encodeResponse(response);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java Sat Jul 9 05:17:19 2011
@@ -41,6 +41,7 @@ public class WebsocketReferenceBindingPr
}
public void stop() {
+ WebsocketReferenceInvoker.shutdown();
}
public InterfaceContract getBindingInterfaceContract() {
Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java Sat Jul 9 05:17:19 2011
@@ -23,6 +23,9 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.interfacedef.Operation;
@@ -33,6 +36,9 @@ import org.apache.websocket.WebSocketCon
public class WebsocketReferenceInvoker implements Invoker {
+ // TODO add timeout mechanism for persistent connections
+ private static ConcurrentMap<String, WebSocket<SocketChannel>> persistentWebsockets = new ConcurrentHashMap<String, WebSocket<SocketChannel>>();
+
protected Operation operation;
protected EndpointReference endpoint;
@@ -43,41 +49,57 @@ public class WebsocketReferenceInvoker i
public Message invoke(Message msg) {
try {
- return doInvoke(msg);
+ WebSocket<SocketChannel> websocket = initWebsocketConnection(endpoint.getBinding().getURI());
+ return doInvoke(msg, websocket);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- public Message doInvoke(Message msg) {
+ private WebSocket<SocketChannel> initWebsocketConnection(String uri) throws IOException, URISyntaxException {
+ WebSocket<SocketChannel> websocket = null;
+ synchronized (persistentWebsockets) {
+ websocket = persistentWebsockets.get(uri);
+ if (websocket == null) {
+ WebSocketConnector connector = new WebSocketConnector();
+ websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
+ persistentWebsockets.put(uri, websocket);
+ }
+ }
+ return websocket;
+ }
+
+ public Message doInvoke(Message msg, WebSocket<SocketChannel> websocket) throws IOException {
String componentName = endpoint.getTargetEndpoint().getComponent().getName();
String serviceName = endpoint.getTargetEndpoint().getService().getName();
String operationName = operation.getName();
- String uri = endpoint.getBinding().getURI() + "/" + componentName + "/" + serviceName + "/" + operationName;
- String jsonParams = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
- String responseJSON = invokeWebSocketRequest(uri, jsonParams);
+ String uri = componentName + "/" + serviceName + "/" + operationName;
+ String payload = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
+ WebSocketBindingRequest request = new WebSocketBindingRequest(UUID.randomUUID().toString(), uri, payload);
+
+ String operationResponse = invokeViaWebsocket(websocket, JSONUtil.encodeRequest(request));
+
+ WebSocketBindingResponse response = JSONUtil.decodeResponse(operationResponse);
Class<?> returnType = operation.getOutputType().getLogical().get(0).getPhysical();
- Object response = JSONUtil.decodeResponse(responseJSON, returnType);
- msg.setBody(response);
+ Object invocationResponse = JSONUtil.decodeResponsePayload(response.getPayload(), returnType);
+ msg.setBody(invocationResponse);
return msg;
}
- private String invokeWebSocketRequest(String uri, String jsonParams) {
- try {
- return doInvokeWebSocketRequest(uri, jsonParams);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+ private String invokeViaWebsocket(WebSocket<SocketChannel> websocket, String request) throws IOException {
+ websocket.sendText(request);
+ return websocket.receiveText();
}
- private String doInvokeWebSocketRequest(String uri, String jsonParams) throws IOException, URISyntaxException {
- WebSocketConnector connector = new WebSocketConnector();
- WebSocket<SocketChannel> websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
- websocket.sendText(jsonParams);
- String jsonResponse = websocket.receiveText();
- websocket.close();
- return jsonResponse;
+ public static void shutdown() {
+ for (WebSocket<SocketChannel> websocket : persistentWebsockets.values()) {
+ try {
+ websocket.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ persistentWebsockets.clear();
}
+
}
Modified: tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java?rev=1144596&r1=1144595&r2=1144596&view=diff
==============================================================================
--- tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java (original)
+++ tuscany/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java Sat Jul 9 05:17:19 2011
@@ -22,24 +22,20 @@ package org.apache.tuscany.sca.binding.w
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.websocket.ServerWebSocket;
-import org.apache.websocket.WebSocket;
-import org.apache.websocket.WebSocketApplication;
-import org.apache.websocket.WebSocketException;
public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
+ private static Map<String, WebSocketOperationDispatcher> dispatchers = new HashMap<String, WebSocketOperationDispatcher>();
+
private RuntimeEndpoint endpoint;
- private static ConcurrentMap<String, ServerWebSocket> websocketServers = new ConcurrentHashMap<String, ServerWebSocket>();
public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) {
this.endpoint = endpoint;
@@ -47,24 +43,26 @@ public class WebsocketServiceBindingProv
public void start() {
String uri = endpoint.getBinding().getURI();
- ServerWebSocket server = initWebSocketServerForURI(uri);
+ WebSocketOperationDispatcher dispatcher = initDispatcherForURI(uri);
String component = endpoint.getComponent().getName();
String service = endpoint.getService().getName();
for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
String operation = op.getName();
- server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op));
- System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation);
+ dispatcher.addOperation(component + "/" + service + "/" + operation, endpoint, op);
}
}
- private ServerWebSocket initWebSocketServerForURI(String uri) {
- ServerWebSocket server = websocketServers.get(uri);
- if (server == null) {
+ private WebSocketOperationDispatcher initDispatcherForURI(String uri) {
+ WebSocketOperationDispatcher dispatcher = dispatchers.get(uri);
+ if (dispatcher == null) {
try {
- server = new ServerWebSocket(new URI(uri));
- websocketServers.put(uri, server);
- System.out.println("Starting websocket server at " + uri + "...");
+ ServerWebSocket server = new ServerWebSocket(new URI(uri));
+ System.out.println("Starting websocket server " + server + " at " + uri + "...");
+ dispatcher = new WebSocketOperationDispatcher(server);
+ System.out.println("Created new dispatcher for " + uri + " " + dispatcher);
+ dispatchers.put(uri, dispatcher);
+ server.register("/", dispatcher);
new Thread(server).start();
} catch (IOException e) {
throw new RuntimeException(e);
@@ -72,20 +70,14 @@ public class WebsocketServiceBindingProv
throw new RuntimeException(e);
}
}
- return server;
+ return dispatcher;
}
public void stop() {
- if (websocketServers != null) {
- for (ServerWebSocket server : websocketServers.values()) {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- websocketServers.clear();
+ for (WebSocketOperationDispatcher dispatcher : dispatchers.values()) {
+ dispatcher.shutdown();
}
+ dispatchers.clear();
}
public InterfaceContract getBindingInterfaceContract() {
@@ -96,47 +88,4 @@ public class WebsocketServiceBindingProv
return false;
}
- public class WebSocketEndpoint implements WebSocketApplication<SocketChannel> {
-
- private RuntimeEndpoint endpoint;
- private Operation operation;
-
- public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) {
- this.endpoint = endpoint;
- this.operation = operation;
- }
-
- @Override
- public void onConnection(WebSocket<SocketChannel> socket) {
- // handle request in a non-blocking fashion releasing the server
- // thread
- new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start();
- }
-
- @Override
- public void onHandshakeError(IOException e) {
- System.out.println("Handshake error!\n");
- e.printStackTrace();
- }
-
- @Override
- public String acceptProtocol(String protocol) {
- // don't accept any subprotocols
- return null;
- }
-
- @Override
- public boolean acceptOrigin(String origin) {
- // accept all clients
- return true;
- }
-
- @Override
- public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
- // don't accept any extensions
- return null;
- }
-
- }
-
}