You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2015/03/18 16:24:05 UTC
cxf git commit: [CXF-6232] Reactor CXF's Atmosphere based WebSocket
transport (adjusted for 3.0.x)
Repository: cxf
Updated Branches:
refs/heads/3.0.x-fixes 467d32055 -> 08fc8aadb
[CXF-6232] Reactor CXF's Atmosphere based WebSocket transport (adjusted for 3.0.x)
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/08fc8aad
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/08fc8aad
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/08fc8aad
Branch: refs/heads/3.0.x-fixes
Commit: 08fc8aadb9e886323533b657604912b33246efe9
Parents: 467d320
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Wed Feb 4 23:47:20 2015 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Wed Mar 18 16:23:58 2015 +0100
----------------------------------------------------------------------
.../websocket/WebSocketServletHolder.java | 2 +-
.../cxf/transport/websocket/WebSocketUtils.java | 91 ++--
.../WebSocketVirtualServletRequest.java | 2 +-
.../WebSocketVirtualServletResponse.java | 4 +-
.../atmosphere/AtmosphereWebSocketHandler.java | 3 +-
.../AtmosphereWebSocketServletDestination.java | 74 ++-
.../AtmosphereWebSocketStreamHandler.java | 3 +-
.../atmosphere/DefaultProtocolInterceptor.java | 254 +++++++++
.../websocket/jetty/JettyWebSocket.java | 3 -
.../websocket/jetty/WebSocketServletHolder.java | 59 +++
.../jetty/WebSocketVirtualServletRequest.java | 527 +++++++++++++++++++
.../jetty/WebSocketVirtualServletResponse.java | 367 +++++++++++++
.../jaxrs/websocket/WebSocketTestClient.java | 10 +-
13 files changed, 1324 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
index 8385fa8..6bcb72a 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
@@ -28,7 +28,7 @@ import javax.servlet.DispatcherType;
import javax.servlet.ServletContext;
/**
- *
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
*/
public interface WebSocketServletHolder {
String getAuthType();
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index a55639c..5dbb930 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -30,14 +30,13 @@ import java.util.TreeMap;
*
*/
public final class WebSocketUtils {
- static final String URI_KEY = "$uri";
- static final String METHOD_KEY = "$method";
- static final String SC_KEY = "$sc";
- static final String SM_KEY = "$sm";
- static final String FLUSHED_KEY = "$flushed";
+ public static final String URI_KEY = "$uri";
+ public static final String METHOD_KEY = "$method";
+ public static final String SC_KEY = "$sc";
+ public static final String FLUSHED_KEY = "$flushed";
+
private static final byte[] CRLF = "\r\n".getBytes();
private static final byte[] COLSP = ": ".getBytes();
- private static final String DEFAULT_SC = "200";
private WebSocketUtils() {
}
@@ -116,6 +115,15 @@ public final class WebSocketUtils {
return buffer.toString();
}
+ public static byte[] readBody(InputStream in) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buf = new byte[8192];
+ for (int n = in.read(buf); n > -1; n = in.read(buf)) {
+ baos.write(buf, 0, n);
+ }
+ return baos.toByteArray();
+ }
+
/**
* Build response bytes with the status and type information specified in the headers.
*
@@ -128,14 +136,15 @@ public final class WebSocketUtils {
public static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) {
ByteArrayBuilder sb = new ByteArrayBuilder();
String v = headers.get(SC_KEY);
- sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
- appendHeaders(headers, sb);
+ if (v != null) {
+ sb.append(v).append(CRLF);
+ }
+ sb.append(headers);
- byte[] longdata = sb.toByteArray();
if (data != null && length > 0) {
- longdata = buildResponse(longdata, data, offset, length);
+ sb.append(CRLF).append(data, offset, length);
}
- return longdata;
+ return sb.toByteArray();
}
/**
@@ -154,9 +163,10 @@ public final class WebSocketUtils {
if (hlen > 0) {
System.arraycopy(headers, 0, longdata, 0, hlen);
}
- longdata[hlen] = 0x0d;
- longdata[hlen + 1] = 0x0a;
- System.arraycopy(data, offset, longdata, hlen + 2, length);
+ if (data != null && length > 0) {
+ System.arraycopy(CRLF, 0, longdata, hlen, CRLF.length);
+ System.arraycopy(data, offset, longdata, hlen + CRLF.length, length);
+ }
return longdata;
}
@@ -172,8 +182,9 @@ public final class WebSocketUtils {
public static byte[] buildResponse(byte[] data, int offset, int length) {
return buildResponse((byte[])null, data, offset, length);
}
-
- static byte[] buildHeaderLine(String name, String value) {
+
+ //FIXME (consolidate the response building code)
+ public static byte[] buildHeaderLine(String name, String value) {
byte[] hl = new byte[name.length() + COLSP.length + value.length() + CRLF.length];
System.arraycopy(name.getBytes(), 0, hl, 0, name.length());
System.arraycopy(COLSP, 0, hl, name.length(), COLSP.length);
@@ -181,7 +192,7 @@ public final class WebSocketUtils {
System.arraycopy(CRLF, 0, hl, name.length() + COLSP.length + value.length(), CRLF.length);
return hl;
}
-
+
/**
* Build request bytes with the specified method, url, headers, and content entity.
*
@@ -196,34 +207,20 @@ public final class WebSocketUtils {
public static byte[] buildRequest(String method, String url, Map<String, String> headers,
byte[] data, int offset, int length) {
ByteArrayBuilder sb = new ByteArrayBuilder();
- sb.append(method).append(' ').append(url).append(CRLF);
- appendHeaders(headers, sb);
- sb.append(CRLF);
+ sb.append(method).append(' ').append(url).append(CRLF).append(headers);
- byte[] longdata = sb.toByteArray();
if (data != null && length > 0) {
- final byte[] hb = longdata;
- longdata = new byte[hb.length + length];
- System.arraycopy(hb, 0, longdata, 0, hb.length);
- System.arraycopy(data, offset, longdata, hb.length, length);
+ sb.append(CRLF).append(data, offset, length);
}
- return longdata;
+ return sb.toByteArray();
}
- private static void appendHeaders(Map<String, String> headers, ByteArrayBuilder sb) {
- for (Entry<String, String> header : headers.entrySet()) {
- if (!header.getKey().startsWith("$")) {
- sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF);
- }
- }
- }
-
private static class ByteArrayBuilder {
private ByteArrayOutputStream baos;
public ByteArrayBuilder() {
baos = new ByteArrayOutputStream();
}
-
+
public ByteArrayBuilder append(byte[] b) {
try {
baos.write(b);
@@ -232,21 +229,35 @@ public final class WebSocketUtils {
}
return this;
}
-
+
+ public ByteArrayBuilder append(byte[] b, int offset, int length) {
+ baos.write(b, offset, length);
+ return this;
+ }
+
public ByteArrayBuilder append(String s) {
try {
- baos.write(s.getBytes());
+ baos.write(s.getBytes("utf-8"));
} catch (IOException e) {
// ignore
}
return this;
}
-
- public ByteArrayBuilder append(char c) {
+
+ public ByteArrayBuilder append(int c) {
baos.write(c);
return this;
}
-
+
+ public ByteArrayBuilder append(Map<String, String> map) {
+ for (Entry<String, String> m : map.entrySet()) {
+ if (!m.getKey().startsWith("$")) {
+ append(m.getKey()).append(COLSP).append(m.getValue()).append(CRLF);
+ }
+ }
+ return this;
+ }
+
public byte[] toByteArray() {
return baos.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
index 9109aed..137b74b 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
@@ -52,7 +52,7 @@ import javax.servlet.http.Part;
import org.apache.cxf.common.logging.LogUtils;
/**
- *
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
*/
public class WebSocketVirtualServletRequest implements HttpServletRequest {
private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
index 149e377..026f31f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
@@ -36,7 +36,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.common.logging.LogUtils;
/**
- *
+ * @deprecated This class is only used by jetty, it has been moved to org.apache.cxf.transport.websocket.jetty
*/
public class WebSocketVirtualServletResponse implements HttpServletResponse {
private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
@@ -257,7 +257,6 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse {
LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
}
responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- responseHeaders.put(WebSocketUtils.SM_KEY, msg);
}
@Override
@@ -304,7 +303,6 @@ public class WebSocketVirtualServletResponse implements HttpServletResponse {
LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
}
responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- responseHeaders.put(WebSocketUtils.SM_KEY, sm);
}
private ServletOutputStream createOutputStream() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 38e6599..1cf1124 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -50,7 +50,8 @@ import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
import org.atmosphere.websocket.WebSocketProtocol;
/**
- *
+ * @deprecated No longer used as the protocol handling is done by Atmosphere's protocol intercepter
+ * such as org.apache.cxf.transport.websocket.atmosphere.DefaultProtocolInterceptor.
*/
public class AtmosphereWebSocketHandler implements WebSocketProtocol {
private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index c8e5fae..7aa4cd3 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -21,15 +21,18 @@ package org.apache.cxf.transport.websocket.atmosphere;
import java.io.IOException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.DestinationRegistry;
import org.apache.cxf.transport.servlet.ServletDestination;
@@ -37,16 +40,20 @@ import org.apache.cxf.transport.websocket.WebSocketDestinationService;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptor;
import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
import org.atmosphere.util.Utils;
-import org.atmosphere.websocket.WebSocketProtocol;
/**
*
*/
public class AtmosphereWebSocketServletDestination extends ServletDestination implements
WebSocketDestinationService {
+ private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class);
+
private AtmosphereFramework framework;
private Executor executor;
@@ -54,19 +61,14 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
String path) throws IOException {
super(bus, registry, ei, path);
this.framework = new AtmosphereFramework(false, true);
-
framework.setUseNativeImplementation(false);
+ framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
- //TODO provide a way to switch between the non-stream handler and the stream handler
- framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL,
- AtmosphereWebSocketHandler.class.getName());
+ framework.interceptor(getInterceptor(bus));
+ framework.addAtmosphereHandler("/", new DestinationHandler());
framework.init();
- WebSocketProtocol wsp = framework.getWebSocketProtocol();
- if (wsp instanceof AtmosphereWebSocketHandler) {
- ((AtmosphereWebSocketHandler)wsp).setDestination(this);
- }
-
// the executor for decoupling the service invocation from websocket's onMessage call which is
// synchronously blocked
executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
@@ -77,7 +79,7 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
HttpServletResponse resp) throws IOException {
if (Utils.webSocketEnabled(req)) {
try {
- framework.doCometSupport(AtmosphereRequest.wrap(new HttpServletRequestFilter(req)),
+ framework.doCometSupport(AtmosphereRequest.wrap(req),
AtmosphereResponse.wrap(resp));
} catch (ServletException e) {
throw new IOException(e);
@@ -96,20 +98,44 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
Executor getExecutor() {
return executor;
}
+
+ private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+
+ @Override
+ public void onRequest(final AtmosphereResource resource) throws IOException {
+ LOG.fine("onRequest");
+ executeHandlerTask(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ invokeInternal(null,
+ resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to invoke service", e);
+ }
+ }
+ });
+ }
+ }
- private static class HttpServletRequestFilter extends HttpServletRequestWrapper {
- private static final String TRANSPORT_ADDRESS
- = "org.apache.cxf.transport.endpoint.address";
- private String transportAddress;
- public HttpServletRequestFilter(HttpServletRequest request) {
- super(request);
- transportAddress = (String)request.getAttribute(TRANSPORT_ADDRESS);
+ private void executeHandlerTask(Runnable r) {
+ try {
+ executor.execute(r);
+ } catch (RejectedExecutionException e) {
+ LOG.warning(
+ "Executor queue is full, run the service invocation task in caller thread."
+ + " Users can specify a larger executor queue to avoid this.");
+ r.run();
}
-
- @Override
- public Object getAttribute(String name) {
- return TRANSPORT_ADDRESS.equals(name) ? transportAddress : super.getAttribute(name);
+ }
+
+ //FIXME a temporary workaround until we decide how to customize atmosphere using cxf's destination configuration
+ private AtmosphereInterceptor getInterceptor(Bus bus) {
+ AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor");
+ if (ai == null) {
+ ai = new DefaultProtocolInterceptor();
}
-
+ LOG.info("AtmosphereInterceptor: " + ai);
+ return ai;
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
index 1f4cc00..ac14b0a 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
@@ -30,7 +30,8 @@ import org.atmosphere.websocket.WebSocket;
import org.atmosphere.websocket.WebSocketProtocolStream;
/**
- *
+ * @deprecated No longer used as the protocol handling is done by Atmosphere's protocol intercepter
+ * such as org.apache.cxf.transport.websocket.atmosphere.DefaultProtocolInterceptor.
*/
public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler implements
WebSocketProtocolStream {
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
new file mode 100644
index 0000000..7c4c6e5
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.atmosphere.config.service.AtmosphereInterceptorService;
+import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.AsyncIOInterceptor;
+import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
+import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.cpr.FrameworkConfig;
+
+/**
+ *
+ */
+@AtmosphereInterceptorService
+public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
+ private static final Logger LOG = LogUtils.getL7dLogger(DefaultProtocolInterceptor.class);
+
+ private static final String REQUEST_DISPATCHED = "request.dispatched";
+ private static final String RESPONSE_PARENT = "response.parent";
+
+ private final AsyncIOInterceptor interceptor = new Interceptor();
+
+ @Override
+ public Action inspect(final AtmosphereResource r) {
+ LOG.log(Level.FINE, "inspect");
+ AtmosphereRequest request = r.getRequest();
+
+ if (request.getAttribute(REQUEST_DISPATCHED) == null) {
+ AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(), request);
+
+ AtmosphereFramework framework = r.getAtmosphereConfig().framework();
+ try {
+ byte[] data = WebSocketUtils.readBody(request.getInputStream());
+ if (data.length == 0) {
+ return Action.CANCELLED;
+ }
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "inspecting data {0}", new String(data));
+ }
+ try {
+ AtmosphereRequest ar = createAtmosphereRequest(request, data);
+ ar.setAttribute(REQUEST_DISPATCHED, "true");
+ String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+ if (refid != null) {
+ ar.setAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
+ }
+ // This is a new request, we must clean the Websocket AtmosphereResource.
+ request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE);
+ response.request(ar);
+ attachWriter(r);
+
+ Action action = framework.doCometSupport(ar, response);
+ if (action.type() == Action.TYPE.SUSPEND) {
+ ar.destroyable(false);
+ response.destroyable(false);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Error during request dispatching", e);
+ if (e instanceof InvalidPathException) {
+ response.setStatus(400);
+ } else {
+ response.setStatus(500);
+ }
+ response.getOutputStream().write(createResponse(response, null, true));
+ }
+ return Action.CANCELLED;
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Error during protocol processing", e);
+ return Action.CONTINUE;
+ }
+ } else {
+ request.setAttribute(REQUEST_DISPATCHED, null);
+ request.setAttribute(RESPONSE_PARENT, null);
+ request.destroyable(false);
+ }
+ return Action.CONTINUE;
+ }
+
+ private void attachWriter(final AtmosphereResource r) {
+ AtmosphereResponse res = r.getResponse();
+ AsyncIOWriter writer = res.getAsyncIOWriter();
+
+ if (writer instanceof AtmosphereInterceptorWriter) {
+ //REVIST need a better way to add a custom filter at the first entry and not at the last as
+ // e.g. interceptor(AsyncIOInterceptor interceptor, int position)
+ Deque<AsyncIOInterceptor> filters = AtmosphereInterceptorWriter.class.cast(writer).filters();
+ if (!filters.contains(interceptor)) {
+ filters.addFirst(interceptor);
+ }
+ }
+ }
+
+ private static AtmosphereRequest createAtmosphereRequest(AtmosphereRequest r, byte[] data) throws IOException {
+ AtmosphereRequest.Builder b = new AtmosphereRequest.Builder();
+ ByteArrayInputStream in = new ByteArrayInputStream(data);
+ Map<String, String> hdrs = WebSocketUtils.readHeaders(in);
+ String path = hdrs.get(WebSocketUtils.URI_KEY);
+ String origin = r.getRequestURI();
+ if (!path.startsWith(origin)) {
+ LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+ throw new InvalidPathException();
+ }
+
+ String requestURI = path;
+ String requestURL = r.getRequestURL() + requestURI.substring(r.getRequestURI().length());
+ String contentType = hdrs.get("Content-Type");
+
+ String method = hdrs.get(WebSocketUtils.METHOD_KEY);
+ b.pathInfo(path)
+ .contentType(contentType)
+ .headers(hdrs)
+ .method(method)
+ .requestURI(requestURI)
+ .requestURL(requestURL)
+ .request(r);
+ // add the body only if it is present
+ byte[] body = WebSocketUtils.readBody(in);
+ if (body.length > 0) {
+ b.body(body);
+ }
+ return b.build();
+ }
+
+ private final class Interceptor extends AsyncIOInterceptorAdapter {
+
+ @Override
+ public byte[] transformPayload(AtmosphereResponse response, byte[] responseDraft, byte[] data)
+ throws IOException {
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "transformPayload with draft={0}", new String(responseDraft));
+ }
+ AtmosphereRequest request = response.request();
+ if (request.getAttribute(RESPONSE_PARENT) == null) {
+ request.setAttribute(RESPONSE_PARENT, "true");
+ return createResponse(response, responseDraft, true);
+ } else {
+ return createResponse(response, responseDraft, false);
+ }
+ }
+
+ @Override
+ public byte[] error(AtmosphereResponse response, int statusCode, String reasonPhrase) {
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "status={0}", statusCode);
+ }
+ response.setStatus(statusCode, reasonPhrase);
+ return createResponse(response, null, true);
+ }
+ }
+
+ private static byte[] createResponse(AtmosphereResponse response, byte[] payload, boolean parent) {
+ AtmosphereRequest request = response.request();
+ String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+
+ Map<String, String> headers = new HashMap<String, String>();
+ if (refid != null) {
+ response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+ headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+ }
+ if (parent) {
+ headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+ if (payload != null && payload.length > 0) {
+ headers.put("Content-Type", response.getContentType());
+ }
+ }
+ return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
+ }
+
+ // a workaround to flush the header data upon close when no write operation occurs
+ private static class WrappedAtmosphereResponse extends AtmosphereResponse {
+ public WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) {
+ super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req, resp.isDestroyable());
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream() throws IOException {
+ final ServletOutputStream delegate = super.getOutputStream();
+ return new ServletOutputStream() {
+ private boolean written;
+
+ @Override
+ public void write(int i) throws IOException {
+ written = true;
+ delegate.write(i);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!written) {
+ delegate.write(createResponse(WrappedAtmosphereResponse.this, null, true));
+ }
+ delegate.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ written = true;
+ delegate.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ written = true;
+ delegate.write(b);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 6ae3c9f..f48efb5 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -41,9 +41,6 @@ import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.transport.websocket.InvalidPathException;
import org.apache.cxf.transport.websocket.WebSocketConstants;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
import org.eclipse.jetty.websocket.WebSocket;
class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
new file mode 100644
index 0000000..44eb7b8
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.Locale;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletContext;
+
+/**
+ *
+ */
+interface WebSocketServletHolder {
+ String getAuthType();
+ String getContextPath();
+ String getLocalAddr();
+ String getLocalName();
+ int getLocalPort();
+ Locale getLocale();
+ Enumeration<Locale> getLocales();
+ String getProtocol();
+ String getRemoteAddr();
+ String getRemoteHost();
+ int getRemotePort();
+ String getRequestURI();
+ StringBuffer getRequestURL();
+ DispatcherType getDispatcherType();
+ boolean isSecure();
+ String getPathInfo();
+ String getPathTranslated();
+ String getScheme();
+ String getServerName();
+ String getServletPath();
+ ServletContext getServletContext();
+ int getServerPort();
+ Principal getUserPrincipal();
+ Object getAttribute(String name);
+ void write(byte[] data, int offset, int length) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
new file mode 100644
index 0000000..e2b3c33
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.Part;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ *
+ */
+class WebSocketVirtualServletRequest implements HttpServletRequest {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
+
+ private WebSocketServletHolder webSocketHolder;
+ private InputStream in;
+ private Map<String, String> requestHeaders;
+ private Map<String, Object> attributes;
+
+ public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in)
+ throws IOException {
+ this.webSocketHolder = websocket;
+ this.in = in;
+
+ this.requestHeaders = WebSocketUtils.readHeaders(in);
+ String path = requestHeaders.get(WebSocketUtils.URI_KEY);
+ String origin = websocket.getRequestURI();
+ if (!path.startsWith(origin)) {
+ LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+ throw new InvalidPathException();
+ }
+ this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+ Object v = websocket.getAttribute("org.apache.cxf.transport.endpoint.address");
+ if (v != null) {
+ attributes.put("org.apache.cxf.transport.endpoint.address", v);
+ }
+ }
+
+ @Override
+ public AsyncContext getAsyncContext() {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name , attributes.get(name)});
+ }
+ return attributes.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getAttributeNames() {
+ LOG.log(Level.FINE, "getAttributeNames()");
+ return Collections.enumeration(attributes.keySet());
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public int getContentLength() {
+ LOG.log(Level.FINE, "getContentLength()");
+ return 0;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return requestHeaders.get("Content-Type");
+ }
+
+ @Override
+ public DispatcherType getDispatcherType() {
+ LOG.log(Level.FINE, "getDispatcherType()");
+ return webSocketHolder.getDispatcherType();
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ return new ServletInputStream() {
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+ };
+ }
+
+ @Override
+ public String getLocalAddr() {
+ LOG.log(Level.FINE, "getLocalAddr()");
+ return webSocketHolder.getLocalAddr();
+ }
+
+ @Override
+ public String getLocalName() {
+ LOG.log(Level.FINE, "getLocalName()");
+ return webSocketHolder.getLocalName();
+ }
+
+ @Override
+ public int getLocalPort() {
+ LOG.log(Level.FINE, "getLocalPort()");
+ return webSocketHolder.getLocalPort();
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale()");
+ return webSocketHolder.getLocale();
+ }
+
+ @Override
+ public Enumeration<Locale> getLocales() {
+ LOG.log(Level.FINE, "getLocales()");
+ return webSocketHolder.getLocales();
+ }
+
+ @Override
+ public String getParameter(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameter({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Map<String, String[]> getParameterMap() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterMap()");
+ return null;
+ }
+
+ @Override
+ public Enumeration<String> getParameterNames() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterNames()");
+ return null;
+ }
+
+ @Override
+ public String[] getParameterValues(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameterValues({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public String getProtocol() {
+ LOG.log(Level.FINE, "getProtocol");
+ return webSocketHolder.getProtocol();
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ LOG.log(Level.FINE, "getReader");
+ return new BufferedReader(new InputStreamReader(in, "utf-8"));
+ }
+
+ @Override
+ public String getRealPath(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRealPath");
+ return null;
+ }
+
+ @Override
+ public String getRemoteAddr() {
+ LOG.log(Level.FINE, "getRemoteAddr");
+ return webSocketHolder.getRemoteAddr();
+ }
+
+ @Override
+ public String getRemoteHost() {
+ LOG.log(Level.FINE, "getRemoteHost");
+ return webSocketHolder.getRemoteHost();
+ }
+
+ @Override
+ public int getRemotePort() {
+ LOG.log(Level.FINE, "getRemotePort");
+ return webSocketHolder.getRemotePort();
+ }
+
+ @Override
+ public RequestDispatcher getRequestDispatcher(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRequestDispatcher");
+ return null;
+ }
+
+ @Override
+ public String getScheme() {
+ LOG.log(Level.FINE, "getScheme");
+ return webSocketHolder.getScheme();
+ }
+
+ @Override
+ public String getServerName() {
+ return webSocketHolder.getServerName();
+ }
+
+ @Override
+ public int getServerPort() {
+ LOG.log(Level.FINE, "getServerPort");
+ return webSocketHolder.getServerPort();
+ }
+
+ @Override
+ public ServletContext getServletContext() {
+ LOG.log(Level.FINE, "getServletContext");
+ return webSocketHolder.getServletContext();
+ }
+
+ @Override
+ public boolean isAsyncStarted() {
+ LOG.log(Level.FINE, "isAsyncStarted");
+ return false;
+ }
+
+ @Override
+ public boolean isAsyncSupported() {
+ LOG.log(Level.FINE, "isAsyncSupported");
+ return false;
+ }
+
+ @Override
+ public boolean isSecure() {
+ LOG.log(Level.FINE, "isSecure");
+ return webSocketHolder.isSecure();
+ }
+
+ @Override
+ public void removeAttribute(String name) {
+ LOG.log(Level.FINE, "removeAttribute");
+ attributes.remove(name);
+ }
+
+ @Override
+ public void setAttribute(String name, Object o) {
+ LOG.log(Level.FINE, "setAttribute");
+ attributes.put(name, o);
+ }
+
+ @Override
+ public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+ LOG.log(Level.FINE, "setCharacterEncoding");
+ // ignore as we stick to utf-8.
+ }
+
+ @Override
+ public AsyncContext startAsync() {
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "authenticate");
+ return false;
+ }
+
+ @Override
+ public String getAuthType() {
+ LOG.log(Level.FINE, "getAuthType");
+ return webSocketHolder.getAuthType();
+ }
+
+ @Override
+ public String getContextPath() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getContextPath -> " + webSocketHolder.getContextPath());
+ }
+ return webSocketHolder.getContextPath();
+ }
+
+ @Override
+ public Cookie[] getCookies() {
+ LOG.log(Level.FINE, "getCookies");
+ return null;
+ }
+
+ @Override
+ public long getDateHeader(String name) {
+ LOG.log(Level.FINE, "getDateHeader");
+ return 0;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ LOG.log(Level.FINE, "getHeader");
+ return requestHeaders.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames");
+ return Collections.enumeration(requestHeaders.keySet());
+ }
+
+ @Override
+ public Enumeration<String> getHeaders(String name) {
+ LOG.log(Level.FINE, "getHeaders");
+ // our protocol assumes no multiple headers
+ return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
+ }
+
+ @Override
+ public int getIntHeader(String name) {
+ LOG.log(Level.FINE, "getIntHeader");
+ String v = requestHeaders.get(name);
+ return v == null ? -1 : Integer.parseInt(v);
+ }
+
+ @Override
+ public String getMethod() {
+ LOG.log(Level.FINE, "getMethod");
+ return requestHeaders.get(WebSocketUtils.METHOD_KEY);
+ }
+
+ @Override
+ public Part getPart(String name) throws IOException, ServletException {
+ LOG.log(Level.FINE, "getPart");
+ return null;
+ }
+
+ @Override
+ public Collection<Part> getParts() throws IOException, ServletException {
+ LOG.log(Level.FINE, "getParts");
+ return null;
+ }
+
+ @Override
+ public String getPathInfo() {
+ String uri = requestHeaders.get(WebSocketUtils.URI_KEY);
+ String servletpath = webSocketHolder.getServletPath();
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getPathInfo " + servletpath + " " + uri);
+ }
+ //TODO remove the query string part
+ //REVISIT may cache this value in requstHeaders?
+ return uri.substring(servletpath.length());
+ }
+
+ @Override
+ public String getPathTranslated() {
+ String path = getPathInfo();
+ String opathtrans = webSocketHolder.getPathTranslated();
+ // some container may choose not to return this value
+ if (opathtrans == null) {
+ return null;
+ }
+ String opathinfo = webSocketHolder.getPathInfo();
+ LOG.log(Level.FINE, "getPathTranslated " + path + " " + opathinfo);
+ int pos = opathtrans.indexOf(opathinfo);
+ //REVISIT may cache this value in requstHeaders?
+ return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString();
+ }
+
+ @Override
+ public String getQueryString() {
+ LOG.log(Level.FINE, "getQueryString");
+ return null;
+ }
+
+ @Override
+ public String getRemoteUser() {
+ LOG.log(Level.FINE, "getRemoteUser");
+ return null;
+ }
+
+ @Override
+ public String getRequestURI() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY));
+ }
+ return requestHeaders.get(WebSocketUtils.URI_KEY);
+ }
+
+ @Override
+ public StringBuffer getRequestURL() {
+ StringBuffer sb = webSocketHolder.getRequestURL();
+ String ouri = webSocketHolder.getRequestURI();
+ String uri = getRequestURI();
+ sb.append(uri.substring(ouri.length()));
+ LOG.log(Level.FINE, "getRequestURL " + uri);
+ return sb;
+ }
+
+ @Override
+ public String getRequestedSessionId() {
+ LOG.log(Level.FINE, "getRequestedSessionId");
+ return null;
+ }
+
+ @Override
+ public String getServletPath() {
+ LOG.log(Level.FINE, "getServletPath " + webSocketHolder.getServletPath());
+ return webSocketHolder.getServletPath();
+ }
+
+ @Override
+ public HttpSession getSession() {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public HttpSession getSession(boolean create) {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ LOG.log(Level.FINE, "getUserPrincipal");
+ return webSocketHolder.getUserPrincipal();
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromCookie() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromCookie");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromURL() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromURL");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromUrl() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromUrl");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdValid() {
+ LOG.log(Level.FINE, "isRequestedSessionIdValid");
+ return false;
+ }
+
+ @Override
+ public boolean isUserInRole(String role) {
+ LOG.log(Level.FINE, "isUserInRole");
+ return false;
+ }
+
+ @Override
+ public void login(String username, String password) throws ServletException {
+ LOG.log(Level.FINE, "login");
+
+ }
+
+ @Override
+ public void logout() throws ServletException {
+ LOG.log(Level.FINE, "logout");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
new file mode 100644
index 0000000..c736861
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ *
+ */
+class WebSocketVirtualServletResponse implements HttpServletResponse {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
+ private WebSocketServletHolder webSocketHolder;
+ private Map<String, String> responseHeaders;
+ private ServletOutputStream outputStream;
+
+ public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) {
+ this.webSocketHolder = websocket;
+ this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ this.outputStream = createOutputStream();
+ }
+
+ @Override
+ public void flushBuffer() throws IOException {
+ LOG.log(Level.FINE, "flushBuffer()");
+ outputStream.flush();
+ }
+
+ @Override
+ public int getBufferSize() {
+ LOG.log(Level.FINE, "getBufferSize()");
+ return 0;
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return responseHeaders.get("Content-Type");
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale");
+ return null;
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream() throws IOException {
+ return outputStream;
+ }
+
+ @Override
+ public PrintWriter getWriter() throws IOException {
+ LOG.log(Level.FINE, "getWriter()");
+ return new PrintWriter(getOutputStream());
+ }
+
+ @Override
+ public boolean isCommitted() {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void resetBuffer() {
+ LOG.log(Level.FINE, "resetBuffer()");
+ }
+
+ @Override
+ public void setBufferSize(int size) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setBufferSize({0})", size);
+ }
+ }
+
+ @Override
+ public void setCharacterEncoding(String charset) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setCharacterEncoding({0})", charset);
+ }
+ }
+
+ @Override
+ public void setContentLength(int len) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentLength({0})", len);
+ }
+ responseHeaders.put("Content-Length", Integer.toString(len));
+ }
+
+ @Override
+ public void setContentType(String type) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentType({0})", type);
+ }
+ responseHeaders.put("Content-Type", type);
+ }
+
+ @Override
+ public void setLocale(Locale loc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setLocale({0})", loc);
+ }
+ }
+
+ @Override
+ public void addCookie(Cookie cookie) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addCookie({0})", cookie);
+ }
+ }
+
+ @Override
+ public void addDateHeader(String name, long date) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void addHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void addIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, Integer.toString(value));
+ }
+
+ @Override
+ public boolean containsHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "containsHeader({0})", name);
+ }
+ return responseHeaders.containsKey(name);
+ }
+
+ @Override
+ public String encodeRedirectURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeRedirectUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeader({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames()");
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaders(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeaders({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public int getStatus() {
+ LOG.log(Level.FINE, "getStatus()");
+ String v = responseHeaders.get(WebSocketUtils.SC_KEY);
+ return v == null ? 200 : Integer.parseInt(v);
+ }
+
+ @Override
+ public void sendError(int sc) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError{0}", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ @Override
+ public void sendError(int sc, String msg) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ @Override
+ public void sendRedirect(String location) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendRedirect({0})", location);
+ }
+ }
+
+ @Override
+ public void setDateHeader(String name, long date) {
+ // ignore
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void setHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void setIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ }
+
+ @Override
+ public void setStatus(int sc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0})", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ @Override
+ public void setStatus(int sc, String sm) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ private ServletOutputStream createOutputStream() {
+ //REVISIT
+ // This output buffering is needed as the server side websocket does
+ // not support the fragment transmission mode when sending back a large data.
+ // And this buffering is only used for the response for the initial service innovation.
+ // For the subsequently pushed data to the socket are sent back
+ // unbuffered as individual websocket messages.
+ // the things to consider :
+ // - provide a size limit if we are use this buffering
+ // - add a chunking mode in the cxf websocket's binding.
+ return new ServletOutputStream() {
+ private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream();
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[1];
+ data[0] = (byte)b;
+ write(data, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int length) throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ // buffer the data until it gets flushed for the first time
+ buffer.write(data, offset, length);
+ } else {
+ // unbuffered write to the socket
+ String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
+ byte[] headers = respid != null
+ ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null;
+ data = WebSocketUtils.buildResponse(headers, data, offset, length);
+ webSocketHolder.write(data, 0, data.length);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size());
+ webSocketHolder.write(data, 0, data.length);
+ responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+ }
+ super.close();
+ }
+ };
+ }
+
+ private static class InternalByteArrayOutputStream extends ByteArrayOutputStream {
+ public byte[] getBytes() {
+ return buf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/08fc8aad/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
index 09e61c2..9795e1c 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
@@ -300,7 +300,15 @@ class WebSocketTestClient {
}
private int length(Object o) {
- return o instanceof char[] ? ((String)o).length() : (o instanceof byte[] ? ((byte[])o).length : 0);
+ if (o instanceof String) {
+ return ((String)o).length();
+ } else if (o instanceof char[]) {
+ return ((char[])o).length;
+ } else if (o instanceof byte[]) {
+ return ((byte[])o).length;
+ } else {
+ return 0;
+ }
}
private int getchar(Object o, int p) {