You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2017/04/21 02:04:43 UTC
[2/8] cxf git commit: add websocket undertow transport
add websocket undertow transport
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6755b06c
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6755b06c
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6755b06c
Branch: refs/heads/master
Commit: 6755b06ca3eede0132d17633783800f3e3f07484
Parents: 156fd30
Author: Freeman Fang <fr...@gmail.com>
Authored: Thu Apr 13 14:04:55 2017 +0800
Committer: Freeman Fang <fr...@gmail.com>
Committed: Thu Apr 13 14:04:55 2017 +0800
----------------------------------------------------------------------
.../http_undertow/UndertowHTTPDestination.java | 7 +-
.../http_undertow/UndertowHTTPHandler.java | 6 +
rt/transports/websocket/pom.xml | 6 +
.../websocket/WebSocketDestinationFactory.java | 87 ++-
.../AtmosphereWebSocketUndertowDestination.java | 302 ++++++++++
.../undertow/ByteBufferInputStream.java | 51 ++
.../WebSocketUndertowServletRequest.java | 589 +++++++++++++++++++
.../WebSocketUndertowServletResponse.java | 392 ++++++++++++
systests/transport-undertow/pom.xml | 61 ++
.../systest/http_undertow/websocket/Book.java | 123 ++++
.../websocket/BookNotFoundDetails.java | 36 ++
.../websocket/BookNotFoundFault.java | 41 ++
.../websocket/BookServerWebSocket.java | 83 +++
.../websocket/BookStorePerRequest.java | 129 ++++
.../websocket/BookStoreWebSocket.java | 186 ++++++
.../http_undertow/websocket/Chapter.java | 106 ++++
.../JAXRSClientServerWebSocketTest.java | 482 +++++++++++++++
.../JAXRSClientServerWebSocketTest.java.bak | 438 ++++++++++++++
.../http_undertow/websocket/SuperBook.java | 45 ++
.../websocket/SuperBookInterface.java | 23 +
.../websocket/WebSocketTestClient.java | 329 +++++++++++
21 files changed, 3498 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java
index 81a4e59..b96c1b9 100644
--- a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java
+++ b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPDestination.java
@@ -80,9 +80,6 @@ public class UndertowHTTPDestination extends ServletDestination {
//Add the default port if the address is missing it
super(bus, registry, ei, getAddressValue(ei, true).getAddress(), true);
this.serverEngineFactory = serverEngineFactory;
- if (serverEngineFactory != null) {
- nurl = new URL(getAddress(endpointInfo));
- }
loader = bus.getExtension(ClassLoader.class);
}
@@ -101,6 +98,8 @@ public class UndertowHTTPDestination extends ServletDestination {
IOException {
if (serverEngineFactory == null) {
return;
+ } else {
+ nurl = new URL(getAddress(endpointInfo));
}
engine =
serverEngineFactory.retrieveUndertowHTTPServerEngine(nurl.getPort());
@@ -239,7 +238,7 @@ public class UndertowHTTPDestination extends ServletDestination {
}
- protected final String getAddress(EndpointInfo endpointInfo) {
+ protected String getAddress(EndpointInfo endpointInfo) {
return endpointInfo.getAddress();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java
index 2c1914a..ff05c97 100644
--- a/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java
+++ b/rt/transports/http-undertow/src/main/java/org/apache/cxf/transport/http_undertow/UndertowHTTPHandler.java
@@ -66,6 +66,10 @@ public class UndertowHTTPHandler implements HttpHandler {
}
}
+ public ServletContext getServletContext() {
+ return this.servletContext;
+ }
+
public void setName(String name) {
urlName = name;
}
@@ -87,6 +91,8 @@ public class UndertowHTTPHandler implements HttpHandler {
undertowExchange.dispatch(this);
return;
}
+
+
HttpServletResponseImpl response = new HttpServletResponseImpl(undertowExchange,
(ServletContextImpl)servletContext);
HttpServletRequestImpl request = new HttpServletRequestImpl(undertowExchange,
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/pom.xml b/rt/transports/websocket/pom.xml
index e14afb1..e10afe7 100644
--- a/rt/transports/websocket/pom.xml
+++ b/rt/transports/websocket/pom.xml
@@ -94,6 +94,12 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-undertow</artifactId>
+ <version>${project.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<optional>true</optional>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
index 686b89a..bbd6f5a 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketDestinationFactory.java
@@ -32,19 +32,25 @@ import org.apache.cxf.transport.http.DestinationRegistry;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.transport.http.HttpDestinationFactory;
import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
+import org.apache.cxf.transport.http_undertow.UndertowHTTPServerEngineFactory;
import org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketServletDestination;
//import org.apache.cxf.transport.websocket.jetty.JettyWebSocketServletDestination;
@NoJSR250Annotations()
public class WebSocketDestinationFactory implements HttpDestinationFactory {
private static final boolean ATMOSPHERE_AVAILABLE = probeClass("org.atmosphere.cpr.ApplicationConfig");
- private static final Constructor<?> JETTY9_WEBSOCKET_DESTINATION_CTR =
+ private static final boolean JETTY_AVAILABLE = probeClass("org.eclipse.jetty.server.Server");
+ private static final boolean UNDERTOW_AVAILABLE = probeClass("io.undertow.websockets.core.WebSockets");
+ private static final Constructor<?> JETTY9_WEBSOCKET_DESTINATION_CTR =
probeConstructor("org.apache.cxf.transport.websocket.jetty9.Jetty9WebSocketDestination");
- private static final Constructor<?> ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR =
+ private static final Constructor<?> ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR =
probeConstructor("org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketJettyDestination");
+ private static final Constructor<?> ATMOSPHERE_WEBSOCKET_UNDERTOW_DESTINATION_CTR =
+ probeUndertowConstructor(
+ "org.apache.cxf.transport.websocket.atmosphere.AtmosphereWebSocketUndertowDestination");
- private final boolean atmosphereDisabled = Boolean.valueOf(
- SystemPropertyAction.getPropertyOrNull("org.apache.cxf.transport.websocket.atmosphere.disabled"));
+ private final boolean atmosphereDisabled = Boolean.valueOf(SystemPropertyAction
+ .getPropertyOrNull("org.apache.cxf.transport.websocket.atmosphere.disabled"));
private static boolean probeClass(String name) {
try {
@@ -58,8 +64,18 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
private static Constructor<?> probeConstructor(String name) {
try {
Class<?> clz = Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader());
- return clz.getConstructor(Bus.class, DestinationRegistry.class,
- EndpointInfo.class, JettyHTTPServerEngineFactory.class);
+ return clz.getConstructor(Bus.class, DestinationRegistry.class, EndpointInfo.class,
+ JettyHTTPServerEngineFactory.class);
+ } catch (Throwable t) {
+ return null;
+ }
+ }
+
+ private static Constructor<?> probeUndertowConstructor(String name) {
+ try {
+ Class<?> clz = Class.forName(name, true, WebSocketDestinationFactory.class.getClassLoader());
+ return clz.getConstructor(Bus.class, DestinationRegistry.class, EndpointInfo.class,
+ UndertowHTTPServerEngineFactory.class);
} catch (Throwable t) {
return null;
}
@@ -68,30 +84,44 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
public AbstractHTTPDestination createDestination(EndpointInfo endpointInfo, Bus bus,
DestinationRegistry registry) throws IOException {
if (endpointInfo.getAddress().startsWith("ws")) {
- // for the embedded mode, we stick to jetty
- JettyHTTPServerEngineFactory serverEngineFactory = bus
- .getExtension(JettyHTTPServerEngineFactory.class);
+
if (ATMOSPHERE_AVAILABLE && !atmosphereDisabled) {
// use atmosphere if available
- return createJettyHTTPDestination(ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR,
- bus, registry, endpointInfo, serverEngineFactory);
+ if (JETTY_AVAILABLE) {
+ // for the embedded mode, we stick to jetty
+ JettyHTTPServerEngineFactory serverEngineFactory = bus
+ .getExtension(JettyHTTPServerEngineFactory.class);
+ return createJettyHTTPDestination(ATMOSPHERE_WEBSOCKET_JETTY_DESTINATION_CTR, bus,
+ registry, endpointInfo, serverEngineFactory);
+ } else if (UNDERTOW_AVAILABLE) {
+ // use AtmosphereWebSocketUndertowDestination
+ UndertowHTTPServerEngineFactory undertowServerEngineFactory = bus
+ .getExtension(UndertowHTTPServerEngineFactory.class);
+ return createUndertowHTTPDestination(ATMOSPHERE_WEBSOCKET_UNDERTOW_DESTINATION_CTR, bus,
+ registry, endpointInfo, undertowServerEngineFactory);
+ }
+ return null;
} else {
- return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR,
- bus, registry, endpointInfo, serverEngineFactory);
+ // for the embedded mode, we stick to jetty
+ JettyHTTPServerEngineFactory serverEngineFactory = bus
+ .getExtension(JettyHTTPServerEngineFactory.class);
+ return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, bus, registry,
+ endpointInfo, serverEngineFactory);
}
} else {
- //REVISIT other way of getting the registry of http so that the plain cxf servlet finds the destination?
+ // REVISIT other way of getting the registry of http so that the plain cxf servlet finds the
+ // destination?
registry = getDestinationRegistry(bus);
// choose atmosphere if available, otherwise assume jetty is available
if (ATMOSPHERE_AVAILABLE && !atmosphereDisabled) {
// use atmosphere if available
- return new AtmosphereWebSocketServletDestination(bus, registry,
- endpointInfo, endpointInfo.getAddress());
+ return new AtmosphereWebSocketServletDestination(bus, registry, endpointInfo,
+ endpointInfo.getAddress());
} else {
// use jetty-websocket
- return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR,
- bus, registry, endpointInfo, null);
+ return createJettyHTTPDestination(JETTY9_WEBSOCKET_DESTINATION_CTR, bus, registry,
+ endpointInfo, null);
}
}
}
@@ -113,10 +143,27 @@ public class WebSocketDestinationFactory implements HttpDestinationFactory {
private AbstractHTTPDestination createJettyHTTPDestination(Constructor<?> ctr, Bus bus,
DestinationRegistry registry, EndpointInfo ei,
- JettyHTTPServerEngineFactory jhsef) throws IOException {
+ JettyHTTPServerEngineFactory jhsef)
+ throws IOException {
+ if (ctr != null) {
+ try {
+ return (AbstractHTTPDestination)ctr.newInstance(bus, registry, ei, jhsef);
+ } catch (Throwable t) {
+ // log
+ t.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ private AbstractHTTPDestination createUndertowHTTPDestination(Constructor<?> ctr, Bus bus,
+ DestinationRegistry registry,
+ EndpointInfo ei,
+ UndertowHTTPServerEngineFactory jhsef)
+ throws IOException {
if (ctr != null) {
try {
- return (AbstractHTTPDestination) ctr.newInstance(bus, registry, ei, jhsef);
+ return (AbstractHTTPDestination)ctr.newInstance(bus, registry, ei, jhsef);
} catch (Throwable t) {
// log
t.printStackTrace();
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java
new file mode 100644
index 0000000..4d1f427
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketUndertowDestination.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.DestinationRegistry;
+import org.apache.cxf.transport.http_undertow.UndertowHTTPDestination;
+import org.apache.cxf.transport.http_undertow.UndertowHTTPHandler;
+import org.apache.cxf.transport.http_undertow.UndertowHTTPServerEngineFactory;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketDestinationService;
+import org.apache.cxf.transport.websocket.undertow.WebSocketUndertowServletRequest;
+import org.apache.cxf.transport.websocket.undertow.WebSocketUndertowServletResponse;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.atmosphere.cpr.ApplicationConfig;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereRequestImpl;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponseImpl;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
+import org.xnio.StreamConnection;
+
+import io.undertow.server.HttpServerExchange;
+import io.undertow.server.HttpUpgradeListener;
+import io.undertow.servlet.handlers.ServletRequestContext;
+import io.undertow.servlet.spec.HttpServletRequestImpl;
+import io.undertow.servlet.spec.HttpServletResponseImpl;
+import io.undertow.servlet.spec.ServletContextImpl;
+import io.undertow.util.Methods;
+import io.undertow.websockets.core.AbstractReceiveListener;
+import io.undertow.websockets.core.BufferedBinaryMessage;
+import io.undertow.websockets.core.BufferedTextMessage;
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.core.protocol.Handshake;
+import io.undertow.websockets.core.protocol.version07.Hybi07Handshake;
+import io.undertow.websockets.core.protocol.version08.Hybi08Handshake;
+import io.undertow.websockets.core.protocol.version13.Hybi13Handshake;
+import io.undertow.websockets.spi.AsyncWebSocketHttpServerExchange;
+
+/**
+ *
+ */
+public class AtmosphereWebSocketUndertowDestination extends UndertowHTTPDestination
+ implements WebSocketDestinationService {
+ private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketUndertowDestination.class);
+ private final Executor executor;
+ private AtmosphereFramework framework;
+
+ public AtmosphereWebSocketUndertowDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei,
+ UndertowHTTPServerEngineFactory serverEngineFactory)
+ throws IOException {
+ super(bus, registry, ei, serverEngineFactory);
+ framework = new AtmosphereFramework(false, true);
+ framework.setUseNativeImplementation(false);
+ framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true");
+ // workaround for atmosphere's jsr356 initialization requiring servletConfig
+ framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPRESS_JSR356, "true");
+ AtmosphereUtils.addInterceptors(framework, bus);
+ framework.addAtmosphereHandler("/", new DestinationHandler());
+ framework.init();
+ executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
+ }
+
+ @Override
+ public void invokeInternal(ServletConfig config, ServletContext context, HttpServletRequest req,
+ HttpServletResponse resp) throws IOException {
+ super.invoke(config, context, req, resp);
+ }
+
+ private static String getNonWSAddress(EndpointInfo endpointInfo) {
+ String address = endpointInfo.getAddress();
+ if (address.startsWith("ws")) {
+ address = "http" + address.substring(2);
+ }
+ return address;
+ }
+
+ @Override
+ protected String getAddress(EndpointInfo endpointInfo) {
+ return getNonWSAddress(endpointInfo);
+ }
+
+ @Override
+ protected String getBasePath(String contextPath) throws IOException {
+ if (StringUtils.isEmpty(endpointInfo.getAddress())) {
+ return "";
+ }
+ return new URL(getAddress(endpointInfo)).getPath();
+ }
+
+ @Override
+ protected UndertowHTTPHandler createUndertowHTTPHandler(UndertowHTTPDestination jhd, boolean cmExact) {
+ return new AtmosphereUndertowWebSocketHandler(jhd, cmExact);
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ framework.destroy();
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ super.shutdown();
+ }
+ }
+
+ private class AtmosphereUndertowWebSocketHandler extends UndertowHTTPHandler {
+ private final Set<Handshake> handshakes;
+ private final Set<WebSocketChannel> peerConnections = Collections
+ .newSetFromMap(new ConcurrentHashMap<WebSocketChannel, Boolean>());
+
+ AtmosphereUndertowWebSocketHandler(UndertowHTTPDestination jhd, boolean cmExact) {
+ super(jhd, cmExact);
+ handshakes = new HashSet<>();
+ handshakes.add(new Hybi13Handshake());
+ handshakes.add(new Hybi08Handshake());
+ handshakes.add(new Hybi07Handshake());
+ }
+
+ @Override
+ public void handleRequest(HttpServerExchange undertowExchange) throws Exception {
+ if (undertowExchange.isInIoThread()) {
+ undertowExchange.dispatch(this);
+ return;
+ }
+ if (!undertowExchange.getRequestMethod().equals(Methods.GET)) {
+ // Only GET is supported to start the handshake
+ handleNormalRequest(undertowExchange);
+ return;
+ }
+ final AsyncWebSocketHttpServerExchange facade = new AsyncWebSocketHttpServerExchange(undertowExchange,
+ peerConnections);
+ Handshake handshaker = null;
+ for (Handshake method : handshakes) {
+ if (method.matches(facade)) {
+ handshaker = method;
+ break;
+ }
+ }
+
+ if (handshaker == null) {
+ handleNormalRequest(undertowExchange);
+ } else {
+ final Handshake selected = handshaker;
+ undertowExchange.upgradeChannel(new HttpUpgradeListener() {
+ @Override
+ public void handleUpgrade(StreamConnection streamConnection,
+ HttpServerExchange exchange) {
+ try {
+
+ WebSocketChannel channel = selected.createChannel(facade, streamConnection,
+ facade.getBufferPool());
+ peerConnections.add(channel);
+ channel.getReceiveSetter().set(new AbstractReceiveListener() {
+ @Override
+ protected void onFullTextMessage(WebSocketChannel channel,
+ BufferedTextMessage message) {
+ handleReceivedMessage(channel, message);
+
+ }
+
+ protected void onFullBinaryMessage(WebSocketChannel channel,
+ BufferedBinaryMessage message)
+ throws IOException {
+
+ handleReceivedMessage(channel, message);
+
+ }
+ });
+ channel.resumeReceives();
+ // handleNormalRequest(undertowExchange);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+ handshaker.handshake(facade);
+ }
+
+ }
+
+ public void handleNormalRequest(HttpServerExchange undertowExchange) throws Exception {
+ HttpServletResponseImpl response = new HttpServletResponseImpl(undertowExchange,
+ (ServletContextImpl)servletContext);
+ HttpServletRequestImpl request = new HttpServletRequestImpl(undertowExchange,
+ (ServletContextImpl)servletContext);
+ ServletRequestContext servletRequestContext = new ServletRequestContext(((ServletContextImpl)servletContext)
+ .getDeployment(), request, response, null);
+
+ undertowExchange.putAttachment(ServletRequestContext.ATTACHMENT_KEY, servletRequestContext);
+ /*
+ * if (AtmosphereUtils.useAtmosphere(request)) { try {
+ * framework.doCometSupport(AtmosphereRequestImpl.wrap(request),
+ * AtmosphereResponseImpl.wrap(response)); } catch (ServletException e) { throw new
+ * IOException(e); } return; } else { super.handleRequest(undertowExchange); }
+ */
+ try {
+ framework.doCometSupport(AtmosphereRequestImpl.wrap(request),
+ AtmosphereResponseImpl.wrap(response));
+
+ } catch (ServletException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void handleNormalRequest(HttpServletRequest request, HttpServletResponse response)
+ throws Exception {
+
+ // if (AtmosphereUtils.useAtmosphere(request)) {
+ try {
+ framework.doCometSupport(AtmosphereRequestImpl.wrap(request),
+ AtmosphereResponseImpl.wrap(response));
+
+ } catch (ServletException e) {
+ throw new IOException(e);
+ }
+ // }
+ }
+
+ private void handleReceivedMessage(WebSocketChannel channel, Object message) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ HttpServletRequest request = new WebSocketUndertowServletRequest(channel, message);
+ HttpServletResponse response = new WebSocketUndertowServletResponse(channel);
+ if (request.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY) != null) {
+ response.setHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY,
+ request.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY));
+ }
+ handleNormalRequest(request, response);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ }
+
+ });
+
+ }
+ }
+
+ private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+
+ @Override
+ public void onRequest(final AtmosphereResource resource) throws IOException {
+ LOG.fine("onRequest");
+ try {
+ invokeInternal(null, resource.getRequest().getServletContext(), resource.getRequest(),
+ resource.getResponse());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to invoke service", e);
+ }
+ }
+ }
+
+ // used for internal tests
+ AtmosphereFramework getAtmosphereFramework() {
+ return framework;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java
new file mode 100644
index 0000000..f32326f
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/ByteBufferInputStream.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.websocket.undertow;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferInputStream extends InputStream {
+
+ ByteBuffer buf;
+
+ public ByteBufferInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ public int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len)
+ throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buf.remaining());
+ buf.get(bytes, off, len);
+ return len;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java
new file mode 100644
index 0000000..0b1d884
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletRequest.java
@@ -0,0 +1,589 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.undertow;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.ReadListener;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
+import javax.servlet.http.Part;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+import io.undertow.websockets.core.BufferedBinaryMessage;
+import io.undertow.websockets.core.BufferedTextMessage;
+import io.undertow.websockets.core.WebSocketChannel;
+
+/**
+ *
+ */
+public class WebSocketUndertowServletRequest implements HttpServletRequest {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketUndertowServletRequest.class);
+
+ private WebSocketChannel channel;
+ private Map<String, String> requestHeaders;
+ private Map<String, Object> attributes;
+ private InputStream in;
+
+ public WebSocketUndertowServletRequest(WebSocketChannel channel, Object message)
+ throws IOException {
+ this.channel = channel;
+ if (message instanceof BufferedBinaryMessage) {
+ in = new ByteBufferInputStream(((BufferedBinaryMessage)message).getData().getResource()[0]);
+ } else if (message instanceof BufferedTextMessage) {
+ in = new ByteArrayInputStream(((BufferedTextMessage)message).getData().getBytes());
+ }
+ this.requestHeaders = WebSocketUtils.readHeaders(in);
+ /*String path = requestHeaders.get(WebSocketUtils.URI_KEY);
+ String origin = channel.getUrl();
+ path = path.substring(0, path.length() - 10);
+ if (!path.startsWith(origin)) {
+ throw new InvalidPathException();
+ }*/
+ this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+ Object v = channel.getAttribute("org.apache.cxf.transport.endpoint.address");
+ if (v != null) {
+ attributes.put("org.apache.cxf.transport.endpoint.address", v);
+ }
+ }
+
+ @Override
+ public AsyncContext getAsyncContext() {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name, attributes.get(name)});
+ }
+ return attributes.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getAttributeNames() {
+ LOG.log(Level.FINE, "getAttributeNames()");
+ return Collections.enumeration(attributes.keySet());
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public int getContentLength() {
+ LOG.log(Level.FINE, "getContentLength()");
+ return 0;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return requestHeaders.get("Content-Type");
+ }
+
+ @Override
+ public DispatcherType getDispatcherType() {
+ LOG.log(Level.FINE, "getDispatcherType()");
+ return null;
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ return new ServletInputStream() {
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public boolean isFinished() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isReady() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadListener(ReadListener arg0) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public String getLocalAddr() {
+ LOG.log(Level.FINE, "getLocalAddr()");
+ try {
+ return new URL(channel.getUrl()).getHost();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public String getLocalName() {
+ LOG.log(Level.FINE, "getLocalName()");
+ try {
+ return new URL(channel.getUrl()).getHost();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public int getLocalPort() {
+ LOG.log(Level.FINE, "getLocalPort()");
+ try {
+ return new URL(channel.getUrl()).getPort();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale()");
+ return null;
+ }
+
+ @Override
+ public Enumeration<Locale> getLocales() {
+ LOG.log(Level.FINE, "getLocales()");
+ return null;
+ }
+
+ @Override
+ public String getParameter(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameter({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Map<String, String[]> getParameterMap() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterMap()");
+ return null;
+ }
+
+ @Override
+ public Enumeration<String> getParameterNames() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterNames()");
+ return null;
+ }
+
+ @Override
+ public String[] getParameterValues(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameterValues({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public String getProtocol() {
+ LOG.log(Level.FINE, "getProtocol");
+ try {
+ return new URL(channel.getUrl()).getProtocol();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ LOG.log(Level.FINE, "getReader");
+ return new BufferedReader(new InputStreamReader(in, "utf-8"));
+ }
+
+ @Override
+ public String getRealPath(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRealPath");
+ return null;
+ }
+
+ @Override
+ public String getRemoteAddr() {
+ LOG.log(Level.FINE, "getRemoteAddr");
+ try {
+ return new URL(channel.getPeerAddress().toString()).getHost();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public String getRemoteHost() {
+ LOG.log(Level.FINE, "getRemoteHost");
+ try {
+ return new URL(channel.getPeerAddress().toString()).getHost();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public int getRemotePort() {
+ LOG.log(Level.FINE, "getRemotePort");
+ try {
+ return new URL(channel.getPeerAddress().toString()).getPort();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ @Override
+ public RequestDispatcher getRequestDispatcher(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRequestDispatcher");
+ return null;
+ }
+
+ @Override
+ public String getScheme() {
+ LOG.log(Level.FINE, "getScheme");
+ try {
+ return new URL(channel.getUrl()).getProtocol();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public String getServerName() {
+ return getLocalName();
+ }
+
+ @Override
+ public int getServerPort() {
+ LOG.log(Level.FINE, "getServerPort");
+ return getLocalPort();
+ }
+
+ @Override
+ public ServletContext getServletContext() {
+ LOG.log(Level.FINE, "getServletContext");
+ return null;
+ }
+
+ @Override
+ public boolean isAsyncStarted() {
+ LOG.log(Level.FINE, "isAsyncStarted");
+ return false;
+ }
+
+ @Override
+ public boolean isAsyncSupported() {
+ LOG.log(Level.FINE, "isAsyncSupported");
+ return false;
+ }
+
+ @Override
+ public boolean isSecure() {
+ LOG.log(Level.FINE, "isSecure");
+ return false;
+ }
+
+ @Override
+ public void removeAttribute(String name) {
+ LOG.log(Level.FINE, "removeAttribute");
+ attributes.remove(name);
+ }
+
+ @Override
+ public void setAttribute(String name, Object o) {
+ LOG.log(Level.FINE, "setAttribute");
+ attributes.put(name, o);
+ }
+
+ @Override
+ public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+ LOG.log(Level.FINE, "setCharacterEncoding");
+ // ignore as we stick to utf-8.
+ }
+
+ @Override
+ public AsyncContext startAsync() {
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "authenticate");
+ return false;
+ }
+
+ @Override
+ public String getAuthType() {
+ LOG.log(Level.FINE, "getAuthType");
+ return "null";
+ }
+
+ @Override
+ public String getContextPath() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getContextPath -> " + null);
+ }
+ return null;
+ }
+
+ @Override
+ public Cookie[] getCookies() {
+ LOG.log(Level.FINE, "getCookies");
+ return null;
+ }
+
+ @Override
+ public long getDateHeader(String name) {
+ LOG.log(Level.FINE, "getDateHeader");
+ return 0;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ LOG.log(Level.FINE, "getHeader");
+ return requestHeaders.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames");
+ return Collections.enumeration(requestHeaders.keySet());
+ }
+
+ @Override
+ public Enumeration<String> getHeaders(String name) {
+ LOG.log(Level.FINE, "getHeaders");
+ // our protocol assumes no multiple headers
+ if (requestHeaders.get(name) != null) {
+ return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
+ } else {
+ return Collections.enumeration(Arrays.asList());
+ }
+ }
+
+ @Override
+ public int getIntHeader(String name) {
+ LOG.log(Level.FINE, "getIntHeader");
+ String v = requestHeaders.get(name);
+ return v == null ? -1 : Integer.parseInt(v);
+ }
+
+ @Override
+ public String getMethod() {
+ LOG.log(Level.FINE, "getMethod");
+ return requestHeaders.get(WebSocketUtils.METHOD_KEY);
+ }
+
+ @Override
+ public Part getPart(String name) throws IOException, ServletException {
+ LOG.log(Level.FINE, "getPart");
+ return null;
+ }
+
+ @Override
+ public Collection<Part> getParts() throws IOException, ServletException {
+ LOG.log(Level.FINE, "getParts");
+ return null;
+ }
+
+ @Override
+ public String getPathInfo() {
+ return null;
+ }
+
+ @Override
+ public String getPathTranslated() {
+ return null;
+ }
+
+ @Override
+ public String getQueryString() {
+ LOG.log(Level.FINE, "getQueryString");
+ return null;
+ }
+
+ @Override
+ public String getRemoteUser() {
+ LOG.log(Level.FINE, "getRemoteUser");
+ return null;
+ }
+
+ @Override
+ public String getRequestURI() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY));
+ }
+ return requestHeaders.get(WebSocketUtils.URI_KEY);
+ }
+
+ @Override
+ public StringBuffer getRequestURL() {
+ return new StringBuffer(getRequestURI());
+ }
+
+ @Override
+ public String getRequestedSessionId() {
+ LOG.log(Level.FINE, "getRequestedSessionId");
+ return null;
+ }
+
+ @Override
+ public String getServletPath() {
+ return null;
+ }
+
+ @Override
+ public HttpSession getSession() {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public HttpSession getSession(boolean create) {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ LOG.log(Level.FINE, "getUserPrincipal");
+ return null;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromCookie() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromCookie");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromURL() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromURL");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromUrl() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromUrl");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdValid() {
+ LOG.log(Level.FINE, "isRequestedSessionIdValid");
+ return false;
+ }
+
+ @Override
+ public boolean isUserInRole(String role) {
+ LOG.log(Level.FINE, "isUserInRole");
+ return false;
+ }
+
+ @Override
+ public void login(String username, String password) throws ServletException {
+ LOG.log(Level.FINE, "login");
+
+ }
+
+ @Override
+ public void logout() throws ServletException {
+ LOG.log(Level.FINE, "logout");
+ }
+
+ @Override
+ public long getContentLengthLong() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String changeSessionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends HttpUpgradeHandler> T upgrade(Class<T> arg0) throws IOException, ServletException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java
new file mode 100644
index 0000000..5404fc6
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/undertow/WebSocketUndertowServletResponse.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.undertow;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+import io.undertow.websockets.core.WebSocketChannel;
+import io.undertow.websockets.core.WebSockets;
+
+/**
+ *
+ */
+public class WebSocketUndertowServletResponse implements HttpServletResponse {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketUndertowServletResponse.class);
+ private WebSocketChannel channel;
+ private Map<String, String> responseHeaders;
+ private ServletOutputStream outputStream;
+
+ public WebSocketUndertowServletResponse(WebSocketChannel channel) {
+ this.channel = channel;
+ this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ this.outputStream = createOutputStream();
+ }
+
+ @Override
+ public void flushBuffer() throws IOException {
+ LOG.log(Level.FINE, "flushBuffer()");
+ outputStream.flush();
+ }
+
+ @Override
+ public int getBufferSize() {
+ LOG.log(Level.FINE, "getBufferSize()");
+ return 0;
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return responseHeaders.get("Content-Type");
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale");
+ return null;
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream() throws IOException {
+ return outputStream;
+ }
+
+ @Override
+ public PrintWriter getWriter() throws IOException {
+ LOG.log(Level.FINE, "getWriter()");
+ return new PrintWriter(getOutputStream());
+ }
+
+ @Override
+ public boolean isCommitted() {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void resetBuffer() {
+ LOG.log(Level.FINE, "resetBuffer()");
+ }
+
+ @Override
+ public void setBufferSize(int size) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setBufferSize({0})", size);
+ }
+ }
+
+ @Override
+ public void setCharacterEncoding(String charset) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setCharacterEncoding({0})", charset);
+ }
+ }
+
+ @Override
+ public void setContentLength(int len) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentLength({0})", len);
+ }
+ responseHeaders.put("Content-Length", Integer.toString(len));
+ }
+
+ @Override
+ public void setContentType(String type) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentType({0})", type);
+ }
+ responseHeaders.put("Content-Type", type);
+ }
+
+ @Override
+ public void setLocale(Locale loc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setLocale({0})", loc);
+ }
+ }
+
+ @Override
+ public void addCookie(Cookie cookie) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addCookie({0})", cookie);
+ }
+ }
+
+ @Override
+ public void addDateHeader(String name, long date) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void addHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void addIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, Integer.toString(value));
+ }
+
+ @Override
+ public boolean containsHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "containsHeader({0})", name);
+ }
+ return responseHeaders.containsKey(name);
+ }
+
+ @Override
+ public String encodeRedirectURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeRedirectUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeader({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames()");
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaders(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeaders({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public int getStatus() {
+ LOG.log(Level.FINE, "getStatus()");
+ String v = responseHeaders.get(WebSocketUtils.SC_KEY);
+ return v == null ? 200 : Integer.parseInt(v);
+ }
+
+ @Override
+ public void sendError(int sc) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError{0}", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+ WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
+ }
+
+ @Override
+ public void sendError(int sc, String msg) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+ WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
+ }
+
+ @Override
+ public void sendRedirect(String location) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendRedirect({0})", location);
+ }
+ }
+
+ @Override
+ public void setDateHeader(String name, long date) {
+ // ignore
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void setHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void setIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ }
+
+ @Override
+ public void setStatus(int sc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0})", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ @Override
+ public void setStatus(int sc, String sm) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ private ServletOutputStream createOutputStream() {
+ //REVISIT
+ // This output buffering is needed as the server side websocket does
+ // not support the fragment transmission mode when sending back a large data.
+ // And this buffering is only used for the response for the initial service innovation.
+ // For the subsequently pushed data to the socket are sent back
+ // unbuffered as individual websocket messages.
+ // the things to consider :
+ // - provide a size limit if we are use this buffering
+ // - add a chunking mode in the cxf websocket's binding.
+ //CHECKSTYLE:OFF
+ return new ServletOutputStream() {
+ private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream();
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[1];
+ data[0] = (byte)b;
+ write(data, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int length) throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ // buffer the data until it gets flushed
+ buffer.write(data, offset, length);
+ } else {
+ // unbuffered write to the socket
+ String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
+ byte[] headers = respid != null
+ ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null;
+ data = WebSocketUtils.buildResponse(headers, data, offset, length);
+ WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
+ }
+ }
+ public void close() throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size());
+ WebSockets.sendText(ByteBuffer.wrap(data), channel, null);
+ responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+ }
+ super.close();
+ }
+
+ @Override
+ public boolean isReady() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWriteListener(WriteListener arg0) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ //CHECKSTYLE:ON
+ }
+
+ private static class InternalByteArrayOutputStream extends ByteArrayOutputStream {
+ public byte[] getBytes() {
+ return buf;
+ }
+ }
+
+ @Override
+ public void setContentLengthLong(long arg0) {
+ throw new UnsupportedOperationException();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/pom.xml
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/pom.xml b/systests/transport-undertow/pom.xml
index 84f86e3..a1eb3ea 100644
--- a/systests/transport-undertow/pom.xml
+++ b/systests/transport-undertow/pom.xml
@@ -107,6 +107,67 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-websocket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ <version>${cxf.ahc.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>${cxf.netty3.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-providers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.atmosphere</groupId>
+ <artifactId>atmosphere-runtime</artifactId>
+ <version>${cxf.atmosphere.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.websocket</groupId>
+ <artifactId>javax.websocket-api</artifactId>
+ <version>1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-hc</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java
new file mode 100644
index 0000000..0c9e332
--- /dev/null
+++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/Book.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.http_undertow.websocket;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlSeeAlso;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+
+@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "class")
+@XmlRootElement(name = "Book")
+@XmlSeeAlso(SuperBook.class)
+public class Book {
+ private String name;
+ private long id;
+ private Map<Long, Chapter> chapters = new HashMap<Long, Chapter>();
+
+ public Book() {
+ init();
+ }
+
+ public Book(String name, long id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ public void setName(String n) {
+ name = n;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setId(long i) {
+ id = i;
+ }
+ public long getId() {
+ return id;
+ }
+
+ @PUT
+ public void cloneState(Book book) {
+ id = book.getId();
+ name = book.getName();
+ }
+
+ @GET
+ public Book retrieveState() {
+ return this;
+ }
+
+ @GET
+ @Path("chapters/{chapterid}/")
+ @Produces("application/xml;charset=ISO-8859-1")
+ public Chapter getChapter(@PathParam("chapterid")int chapterid) {
+ return chapters.get(new Long(chapterid));
+ }
+
+ @GET
+ @Path("chapters/acceptencoding/{chapterid}/")
+ @Produces("application/xml")
+ public Chapter getChapterAcceptEncoding(@PathParam("chapterid")int chapterid) {
+ return chapters.get(new Long(chapterid));
+ }
+
+ @GET
+ @Path("chapters/badencoding/{chapterid}/")
+ @Produces("application/xml;charset=UTF-48")
+ public Chapter getChapterBadEncoding(@PathParam("chapterid")int chapterid) {
+ return chapters.get(new Long(chapterid));
+ }
+
+ @Path("chapters/sub/{chapterid}/")
+ public Chapter getSubChapter(@PathParam("chapterid")int chapterid) {
+ return chapters.get(new Long(chapterid));
+ }
+
+ @Path("chaptersobject/sub/{chapterid}/")
+ public Object getSubChapterObject(@PathParam("chapterid")int chapterid) {
+ return getSubChapter(chapterid);
+ }
+
+
+ final void init() {
+ Chapter c1 = new Chapter();
+ c1.setId(1);
+ c1.setTitle("chapter 1");
+ chapters.put(c1.getId(), c1);
+ Chapter c2 = new Chapter();
+ c2.setId(2);
+ c2.setTitle("chapter 2");
+ chapters.put(c2.getId(), c2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java
new file mode 100644
index 0000000..826cefd
--- /dev/null
+++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundDetails.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.http_undertow.websocket;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+public class BookNotFoundDetails {
+ private long id;
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java
new file mode 100644
index 0000000..e901d86
--- /dev/null
+++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookNotFoundFault.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.http_undertow.websocket;
+
+import javax.xml.ws.WebFault;
+
+@WebFault
+public class BookNotFoundFault extends Exception {
+ private static final long serialVersionUID = 4833573020359208072L;
+ private BookNotFoundDetails details;
+
+ public BookNotFoundFault(String errorMessage) {
+ super(errorMessage);
+ }
+
+ public BookNotFoundFault(BookNotFoundDetails details) {
+ super();
+ this.details = details;
+ }
+
+ public BookNotFoundDetails getFaultInfo() {
+ return details;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java
new file mode 100644
index 0000000..4b8cb00
--- /dev/null
+++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookServerWebSocket.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.http_undertow.websocket;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class BookServerWebSocket extends AbstractBusTestServerBase {
+ public static final String PORT = allocatePort(BookServerWebSocket.class, 1);
+ public static final String PORT_SPRING = allocatePort(BookServerWebSocket.class, 2);
+ public static final String PORT_WAR = allocatePort(BookServerWebSocket.class, 3);
+ public static final String PORT2 = allocatePort(BookServerWebSocket.class, 4);
+ public static final String PORT2_SPRING = allocatePort(BookServerWebSocket.class, 5);
+ public static final String PORT2_WAR = allocatePort(BookServerWebSocket.class, 6);
+
+ org.apache.cxf.endpoint.Server server;
+
+ private String port;
+
+ public BookServerWebSocket() {
+ this(PORT);
+ }
+
+ public BookServerWebSocket(String port) {
+ this.port = port;
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ setBus(bus);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setBus(bus);
+ sf.setResourceClasses(BookStoreWebSocket.class, BookStorePerRequest.class);
+ sf.setProvider(new StreamingResponseProvider<Book>());
+ sf.setResourceProvider(BookStoreWebSocket.class,
+ new SingletonResourceProvider(new BookStoreWebSocket(), true));
+ sf.setAddress("ws://localhost:" + port + "/websocket");
+ server = sf.create();
+
+ BusFactory.setDefaultBus(null);
+ BusFactory.setThreadDefaultBus(null);
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ BookServerWebSocket s = new BookServerWebSocket();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6755b06c/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java
----------------------------------------------------------------------
diff --git a/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java
new file mode 100644
index 0000000..67b4d55
--- /dev/null
+++ b/systests/transport-undertow/src/test/java/org/apache/cxf/systest/http_undertow/websocket/BookStorePerRequest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.http_undertow.websocket;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+@Path("/bookstore2")
+public class BookStorePerRequest {
+
+ private HttpHeaders httpHeaders;
+ private Map<Long, Book> books = new HashMap<Long, Book>();
+ private List<String> bookIds;
+ private List<String> setterBookIds;
+
+ public BookStorePerRequest() {
+ throw new RuntimeException();
+ }
+
+ public BookStorePerRequest(@Context HttpHeaders headers) {
+ throw new RuntimeException();
+ }
+
+ public BookStorePerRequest(@Context HttpHeaders headers, Long bar) {
+ throw new RuntimeException();
+ }
+
+ public BookStorePerRequest(@Context HttpHeaders headers,
+ @HeaderParam("BOOK") List<String> bookIds) {
+ if (!bookIds.contains("3")) {
+ throw new ClientErrorException(Response.status(400).type("text/plain")
+ .entity("Constructor: Header value 3 is required").build());
+ }
+ httpHeaders = headers;
+ this.bookIds = bookIds;
+ init();
+ }
+
+ @HeaderParam("Book")
+ public void setBook(List<String> ids) {
+ if (!ids.equals(bookIds) || ids.size() != 3) {
+ throw new ClientErrorException(Response.status(400).type("text/plain")
+ .entity("Param setter: 3 header values are required").build());
+ }
+ setterBookIds = ids;
+ }
+
+ @Context
+ public void setHttpHeaders(HttpHeaders headers) {
+ List<String> ids = httpHeaders.getRequestHeader("BOOK");
+ if (ids.contains("4")) {
+ throw new ClientErrorException(Response.status(400).type("text/plain")
+ .entity("Context setter: unexpected header value").build());
+ }
+ }
+
+ @GET
+ @Path("/book%20headers/")
+ public Book getBookByHeader2() throws Exception {
+ return getBookByHeader();
+ }
+
+ @GET
+ @Path("/bookheaders/")
+ public Book getBookByHeader() throws Exception {
+
+ List<String> ids = httpHeaders.getRequestHeader("BOOK");
+ if (!ids.equals(bookIds)) {
+ throw new RuntimeException();
+ }
+ return doGetBook(ids.get(0) + ids.get(1) + ids.get(2));
+ }
+
+ @GET
+ @Path("/bookheaders/injected")
+ public Book getBookByHeaderInjected() throws Exception {
+
+ return doGetBook(setterBookIds.get(0) + setterBookIds.get(1) + setterBookIds.get(2));
+ }
+
+ private Book doGetBook(String id) throws BookNotFoundFault {
+ Book book = books.get(Long.parseLong(id));
+ if (book != null) {
+ return book;
+ } else {
+ BookNotFoundDetails details = new BookNotFoundDetails();
+ details.setId(Long.parseLong(id));
+ throw new BookNotFoundFault(details);
+ }
+ }
+
+
+ final void init() {
+ Book book = new Book();
+ book.setId(123);
+ book.setName("CXF in Action");
+ books.put(book.getId(), book);
+ }
+
+}
+
+