You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2015/02/05 18:06:24 UTC
[1/2] cxf git commit: [CXF-6232] Refacor CXF's Atmosphere based
WebSocket transport part 1
Repository: cxf
Updated Branches:
refs/heads/master 42eba2f94 -> 28e185b27
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
new file mode 100644
index 0000000..870c3c9
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletRequest.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.DispatcherType;
+import javax.servlet.RequestDispatcher;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.Part;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ *
+ */
+public class WebSocketVirtualServletRequest implements HttpServletRequest {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
+
+ private WebSocketServletHolder webSocketHolder;
+ private InputStream in;
+ private Map<String, String> requestHeaders;
+ private Map<String, Object> attributes;
+
+ public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in)
+ throws IOException {
+ this.webSocketHolder = websocket;
+ this.in = in;
+
+ this.requestHeaders = WebSocketUtils.readHeaders(in);
+ String path = requestHeaders.get(WebSocketUtils.URI_KEY);
+ String origin = websocket.getRequestURI();
+ if (!path.startsWith(origin)) {
+ LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+ throw new InvalidPathException();
+ }
+ this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
+ Object v = websocket.getAttribute("org.apache.cxf.transport.endpoint.address");
+ if (v != null) {
+ attributes.put("org.apache.cxf.transport.endpoint.address", v);
+ }
+ }
+
+ @Override
+ public AsyncContext getAsyncContext() {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name , attributes.get(name)});
+ }
+ return attributes.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getAttributeNames() {
+ LOG.log(Level.FINE, "getAttributeNames()");
+ return Collections.enumeration(attributes.keySet());
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public int getContentLength() {
+ LOG.log(Level.FINE, "getContentLength()");
+ return 0;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return requestHeaders.get("Content-Type");
+ }
+
+ @Override
+ public DispatcherType getDispatcherType() {
+ LOG.log(Level.FINE, "getDispatcherType()");
+ return webSocketHolder.getDispatcherType();
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ return new ServletInputStream() {
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+ };
+ }
+
+ @Override
+ public String getLocalAddr() {
+ LOG.log(Level.FINE, "getLocalAddr()");
+ return webSocketHolder.getLocalAddr();
+ }
+
+ @Override
+ public String getLocalName() {
+ LOG.log(Level.FINE, "getLocalName()");
+ return webSocketHolder.getLocalName();
+ }
+
+ @Override
+ public int getLocalPort() {
+ LOG.log(Level.FINE, "getLocalPort()");
+ return webSocketHolder.getLocalPort();
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale()");
+ return webSocketHolder.getLocale();
+ }
+
+ @Override
+ public Enumeration<Locale> getLocales() {
+ LOG.log(Level.FINE, "getLocales()");
+ return webSocketHolder.getLocales();
+ }
+
+ @Override
+ public String getParameter(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameter({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Map<String, String[]> getParameterMap() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterMap()");
+ return null;
+ }
+
+ @Override
+ public Enumeration<String> getParameterNames() {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getParameterNames()");
+ return null;
+ }
+
+ @Override
+ public String[] getParameterValues(String name) {
+ // TODO Auto-generated method stub
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getParameterValues({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public String getProtocol() {
+ LOG.log(Level.FINE, "getProtocol");
+ return webSocketHolder.getProtocol();
+ }
+
+ @Override
+ public BufferedReader getReader() throws IOException {
+ LOG.log(Level.FINE, "getReader");
+ return new BufferedReader(new InputStreamReader(in, "utf-8"));
+ }
+
+ @Override
+ public String getRealPath(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRealPath");
+ return null;
+ }
+
+ @Override
+ public String getRemoteAddr() {
+ LOG.log(Level.FINE, "getRemoteAddr");
+ return webSocketHolder.getRemoteAddr();
+ }
+
+ @Override
+ public String getRemoteHost() {
+ LOG.log(Level.FINE, "getRemoteHost");
+ return webSocketHolder.getRemoteHost();
+ }
+
+ @Override
+ public int getRemotePort() {
+ LOG.log(Level.FINE, "getRemotePort");
+ return webSocketHolder.getRemotePort();
+ }
+
+ @Override
+ public RequestDispatcher getRequestDispatcher(String path) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "getRequestDispatcher");
+ return null;
+ }
+
+ @Override
+ public String getScheme() {
+ LOG.log(Level.FINE, "getScheme");
+ return webSocketHolder.getScheme();
+ }
+
+ @Override
+ public String getServerName() {
+ return webSocketHolder.getServerName();
+ }
+
+ @Override
+ public int getServerPort() {
+ LOG.log(Level.FINE, "getServerPort");
+ return webSocketHolder.getServerPort();
+ }
+
+ @Override
+ public ServletContext getServletContext() {
+ LOG.log(Level.FINE, "getServletContext");
+ return webSocketHolder.getServletContext();
+ }
+
+ @Override
+ public boolean isAsyncStarted() {
+ LOG.log(Level.FINE, "isAsyncStarted");
+ return false;
+ }
+
+ @Override
+ public boolean isAsyncSupported() {
+ LOG.log(Level.FINE, "isAsyncSupported");
+ return false;
+ }
+
+ @Override
+ public boolean isSecure() {
+ LOG.log(Level.FINE, "isSecure");
+ return webSocketHolder.isSecure();
+ }
+
+ @Override
+ public void removeAttribute(String name) {
+ LOG.log(Level.FINE, "removeAttribute");
+ attributes.remove(name);
+ }
+
+ @Override
+ public void setAttribute(String name, Object o) {
+ LOG.log(Level.FINE, "setAttribute");
+ attributes.put(name, o);
+ }
+
+ @Override
+ public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
+ LOG.log(Level.FINE, "setCharacterEncoding");
+ // ignore as we stick to utf-8.
+ }
+
+ @Override
+ public AsyncContext startAsync() {
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "startAsync");
+ return null;
+ }
+
+ @Override
+ public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
+ // TODO Auto-generated method stub
+ LOG.log(Level.FINE, "authenticate");
+ return false;
+ }
+
+ @Override
+ public String getAuthType() {
+ LOG.log(Level.FINE, "getAuthType");
+ return webSocketHolder.getAuthType();
+ }
+
+ @Override
+ public String getContextPath() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getContextPath -> " + webSocketHolder.getContextPath());
+ }
+ return webSocketHolder.getContextPath();
+ }
+
+ @Override
+ public Cookie[] getCookies() {
+ LOG.log(Level.FINE, "getCookies");
+ return null;
+ }
+
+ @Override
+ public long getDateHeader(String name) {
+ LOG.log(Level.FINE, "getDateHeader");
+ return 0;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ LOG.log(Level.FINE, "getHeader");
+ return requestHeaders.get(name);
+ }
+
+ @Override
+ public Enumeration<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames");
+ return Collections.enumeration(requestHeaders.keySet());
+ }
+
+ @Override
+ public Enumeration<String> getHeaders(String name) {
+ LOG.log(Level.FINE, "getHeaders");
+ // our protocol assumes no multiple headers
+ return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
+ }
+
+ @Override
+ public int getIntHeader(String name) {
+ LOG.log(Level.FINE, "getIntHeader");
+ String v = requestHeaders.get(name);
+ return v == null ? -1 : Integer.parseInt(v);
+ }
+
+ @Override
+ public String getMethod() {
+ LOG.log(Level.FINE, "getMethod");
+ return requestHeaders.get(WebSocketUtils.METHOD_KEY);
+ }
+
+ @Override
+ public Part getPart(String name) throws IOException, ServletException {
+ LOG.log(Level.FINE, "getPart");
+ return null;
+ }
+
+ @Override
+ public Collection<Part> getParts() throws IOException, ServletException {
+ LOG.log(Level.FINE, "getParts");
+ return null;
+ }
+
+ @Override
+ public String getPathInfo() {
+ String uri = requestHeaders.get(WebSocketUtils.URI_KEY);
+ String servletpath = webSocketHolder.getServletPath();
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getPathInfo " + servletpath + " " + uri);
+ }
+ //TODO remove the query string part
+ //REVISIT may cache this value in requstHeaders?
+ return uri.substring(servletpath.length());
+ }
+
+ @Override
+ public String getPathTranslated() {
+ String path = getPathInfo();
+ String opathtrans = webSocketHolder.getPathTranslated();
+ // some container may choose not to return this value
+ if (opathtrans == null) {
+ return null;
+ }
+ String opathinfo = webSocketHolder.getPathInfo();
+ LOG.log(Level.FINE, "getPathTranslated " + path + " " + opathinfo);
+ int pos = opathtrans.indexOf(opathinfo);
+ //REVISIT may cache this value in requstHeaders?
+ return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString();
+ }
+
+ @Override
+ public String getQueryString() {
+ LOG.log(Level.FINE, "getQueryString");
+ return null;
+ }
+
+ @Override
+ public String getRemoteUser() {
+ LOG.log(Level.FINE, "getRemoteUser");
+ return null;
+ }
+
+ @Override
+ public String getRequestURI() {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY));
+ }
+ return requestHeaders.get(WebSocketUtils.URI_KEY);
+ }
+
+ @Override
+ public StringBuffer getRequestURL() {
+ StringBuffer sb = webSocketHolder.getRequestURL();
+ String ouri = webSocketHolder.getRequestURI();
+ String uri = getRequestURI();
+ sb.append(uri.substring(ouri.length()));
+ LOG.log(Level.FINE, "getRequestURL " + uri);
+ return sb;
+ }
+
+ @Override
+ public String getRequestedSessionId() {
+ LOG.log(Level.FINE, "getRequestedSessionId");
+ return null;
+ }
+
+ @Override
+ public String getServletPath() {
+ LOG.log(Level.FINE, "getServletPath " + webSocketHolder.getServletPath());
+ return webSocketHolder.getServletPath();
+ }
+
+ @Override
+ public HttpSession getSession() {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public HttpSession getSession(boolean create) {
+ LOG.log(Level.FINE, "getSession");
+ return null;
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ LOG.log(Level.FINE, "getUserPrincipal");
+ return webSocketHolder.getUserPrincipal();
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromCookie() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromCookie");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromURL() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromURL");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdFromUrl() {
+ LOG.log(Level.FINE, "isRequestedSessionIdFromUrl");
+ return false;
+ }
+
+ @Override
+ public boolean isRequestedSessionIdValid() {
+ LOG.log(Level.FINE, "isRequestedSessionIdValid");
+ return false;
+ }
+
+ @Override
+ public boolean isUserInRole(String role) {
+ LOG.log(Level.FINE, "isUserInRole");
+ return false;
+ }
+
+ @Override
+ public void login(String username, String password) throws ServletException {
+ LOG.log(Level.FINE, "login");
+
+ }
+
+ @Override
+ public void logout() throws ServletException {
+ LOG.log(Level.FINE, "logout");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
new file mode 100644
index 0000000..b9908b1
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketVirtualServletResponse.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+
+/**
+ *
+ */
+public class WebSocketVirtualServletResponse implements HttpServletResponse {
+ private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
+ private WebSocketServletHolder webSocketHolder;
+ private Map<String, String> responseHeaders;
+ private ServletOutputStream outputStream;
+
+ public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) {
+ this.webSocketHolder = websocket;
+ this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ this.outputStream = createOutputStream();
+ }
+
+ @Override
+ public void flushBuffer() throws IOException {
+ LOG.log(Level.FINE, "flushBuffer()");
+ outputStream.flush();
+ }
+
+ @Override
+ public int getBufferSize() {
+ LOG.log(Level.FINE, "getBufferSize()");
+ return 0;
+ }
+
+ @Override
+ public String getCharacterEncoding() {
+ LOG.log(Level.FINE, "getCharacterEncoding()");
+ return null;
+ }
+
+ @Override
+ public String getContentType() {
+ LOG.log(Level.FINE, "getContentType()");
+ return responseHeaders.get("Content-Type");
+ }
+
+ @Override
+ public Locale getLocale() {
+ LOG.log(Level.FINE, "getLocale");
+ return null;
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream() throws IOException {
+ return outputStream;
+ }
+
+ @Override
+ public PrintWriter getWriter() throws IOException {
+ LOG.log(Level.FINE, "getWriter()");
+ return new PrintWriter(getOutputStream());
+ }
+
+ @Override
+ public boolean isCommitted() {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void resetBuffer() {
+ LOG.log(Level.FINE, "resetBuffer()");
+ }
+
+ @Override
+ public void setBufferSize(int size) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setBufferSize({0})", size);
+ }
+ }
+
+ @Override
+ public void setCharacterEncoding(String charset) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setCharacterEncoding({0})", charset);
+ }
+ }
+
+ @Override
+ public void setContentLength(int len) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentLength({0})", len);
+ }
+ responseHeaders.put("Content-Length", Integer.toString(len));
+ }
+
+ @Override
+ public void setContentType(String type) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setContentType({0})", type);
+ }
+ responseHeaders.put("Content-Type", type);
+ }
+
+ @Override
+ public void setLocale(Locale loc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setLocale({0})", loc);
+ }
+ }
+
+ @Override
+ public void addCookie(Cookie cookie) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addCookie({0})", cookie);
+ }
+ }
+
+ @Override
+ public void addDateHeader(String name, long date) {
+ // TODO
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void addHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void addIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, Integer.toString(value));
+ }
+
+ @Override
+ public boolean containsHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "containsHeader({0})", name);
+ }
+ return responseHeaders.containsKey(name);
+ }
+
+ @Override
+ public String encodeRedirectURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeRedirectUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeRedirectUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeURL(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeURL({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String encodeUrl(String url) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "encodeUrl({0})", url);
+ }
+ return null;
+ }
+
+ @Override
+ public String getHeader(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeader({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaderNames() {
+ LOG.log(Level.FINE, "getHeaderNames()");
+ return null;
+ }
+
+ @Override
+ public Collection<String> getHeaders(String name) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "getHeaders({0})", name);
+ }
+ return null;
+ }
+
+ @Override
+ public int getStatus() {
+ LOG.log(Level.FINE, "getStatus()");
+ String v = responseHeaders.get(WebSocketUtils.SC_KEY);
+ return v == null ? 200 : Integer.parseInt(v);
+ }
+
+ @Override
+ public void sendError(int sc) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError{0}", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+ webSocketHolder.write(data, 0, data.length);
+ }
+
+ @Override
+ public void sendError(int sc, String msg) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
+ webSocketHolder.write(data, 0, data.length);
+ }
+
+ @Override
+ public void sendRedirect(String location) throws IOException {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "sendRedirect({0})", location);
+ }
+ }
+
+ @Override
+ public void setDateHeader(String name, long date) {
+ // ignore
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date});
+ }
+ }
+
+ @Override
+ public void setHeader(String name, String value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value});
+ }
+ responseHeaders.put(name, value);
+ }
+
+ @Override
+ public void setIntHeader(String name, int value) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value});
+ }
+ }
+
+ @Override
+ public void setStatus(int sc) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0})", sc);
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ @Override
+ public void setStatus(int sc, String sm) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
+ }
+ responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
+ }
+
+ private ServletOutputStream createOutputStream() {
+ //REVISIT
+ // This output buffering is needed as the server side websocket does
+ // not support the fragment transmission mode when sending back a large data.
+ // And this buffering is only used for the response for the initial service innovation.
+ // For the subsequently pushed data to the socket are sent back
+ // unbuffered as individual websocket messages.
+ // the things to consider :
+ // - provide a size limit if we are use this buffering
+ // - add a chunking mode in the cxf websocket's binding.
+ return new ServletOutputStream() {
+ private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream();
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[1];
+ data[0] = (byte)b;
+ write(data, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int length) throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ // buffer the data until it gets flushed
+ buffer.write(data, offset, length);
+ } else {
+ // unbuffered write to the socket
+ String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
+ byte[] headers = respid != null
+ ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null;
+ data = WebSocketUtils.buildResponse(headers, data, offset, length);
+ webSocketHolder.write(data, 0, data.length);
+ }
+ }
+ public void close() throws IOException {
+ if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
+ byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size());
+ webSocketHolder.write(data, 0, data.length);
+ responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
+ }
+ super.close();
+ }
+ };
+ }
+
+ private static class InternalByteArrayOutputStream extends ByteArrayOutputStream {
+ public byte[] getBytes() {
+ return buf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
index 32e681e..af16b9f 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty9/Jetty9WebSocketDestination.java
@@ -48,9 +48,9 @@ import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
import org.apache.cxf.transport.websocket.InvalidPathException;
import org.apache.cxf.transport.websocket.WebSocketConstants;
import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
+import org.apache.cxf.transport.websocket.jetty.WebSocketServletHolder;
+import org.apache.cxf.transport.websocket.jetty.WebSocketVirtualServletRequest;
+import org.apache.cxf.transport.websocket.jetty.WebSocketVirtualServletResponse;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.websocket.api.Session;
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
index 69ad06c..6f3f8ad 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/WebSocketTestClient.java
@@ -307,7 +307,15 @@ class WebSocketTestClient {
}
private int length(Object o) {
- return o instanceof char[] ? ((String)o).length() : (o instanceof byte[] ? ((byte[])o).length : 0);
+ if (o instanceof String) {
+ return ((String)o).length();
+ } else if (o instanceof char[]) {
+ return ((char[])o).length;
+ } else if (o instanceof byte[]) {
+ return ((byte[])o).length;
+ } else {
+ return 0;
+ }
}
private int getchar(Object o, int p) {
[2/2] cxf git commit: [CXF-6232] Refacor CXF's Atmosphere based
WebSocket transport part 1
Posted by ay...@apache.org.
[CXF-6232] Refacor CXF's Atmosphere based WebSocket transport part 1
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/28e185b2
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/28e185b2
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/28e185b2
Branch: refs/heads/master
Commit: 28e185b27e8c80cd773cac46d1b9b19529c4fab0
Parents: 42eba2f
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Thu Feb 5 13:28:38 2015 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Thu Feb 5 17:52:11 2015 +0100
----------------------------------------------------------------------
parent/pom.xml | 4 +-
.../websocket/WebSocketServletHolder.java | 59 ---
.../cxf/transport/websocket/WebSocketUtils.java | 91 ++--
.../WebSocketVirtualServletRequest.java | 525 ------------------
.../WebSocketVirtualServletResponse.java | 369 -------------
.../atmosphere/AtmosphereWebSocketHandler.java | 330 ------------
.../AtmosphereWebSocketServletDestination.java | 74 ++-
.../AtmosphereWebSocketStreamHandler.java | 50 --
.../atmosphere/DefaultProtocolInterceptor.java | 272 ++++++++++
.../websocket/jetty/JettyWebSocket.java | 3 -
.../websocket/jetty/WebSocketServletHolder.java | 59 +++
.../jetty/WebSocketVirtualServletRequest.java | 527 +++++++++++++++++++
.../jetty/WebSocketVirtualServletResponse.java | 369 +++++++++++++
.../jetty9/Jetty9WebSocketDestination.java | 6 +-
.../jaxrs/websocket/WebSocketTestClient.java | 10 +-
15 files changed, 1342 insertions(+), 1406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 1d41459..9597687 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -71,7 +71,7 @@
<cxf.activemq.version>5.10.0</cxf.activemq.version>
<cxf.ahc.version>1.8.5</cxf.ahc.version>
<cxf.apacheds.version>2.0.0-M19</cxf.apacheds.version>
- <cxf.atmosphere.version>2.2.4</cxf.atmosphere.version>
+ <cxf.atmosphere.version>2.2.5-SNAPSHOT</cxf.atmosphere.version>
<cxf.atmosphere.version.range>[2.0,3.0)</cxf.atmosphere.version.range>
<cxf.axiom.version>1.2.14</cxf.axiom.version>
<cxf.bcprov.version>1.51</cxf.bcprov.version>
@@ -113,7 +113,7 @@
<cxf.jettison.version>1.3.7</cxf.jettison.version>
<cxf.jetty8.version>8.1.15.v20140411</cxf.jetty8.version>
<cxf.jetty9.version>9.2.3.v20140905</cxf.jetty9.version>
- <cxf.jetty.version>${cxf.jetty9.version}</cxf.jetty.version>
+ <cxf.jetty.version>${cxf.jetty8.version}</cxf.jetty.version>
<cxf.jetty.osgi.version>[8.1,10)</cxf.jetty.osgi.version>
<cxf.jibx.version>1.2.6</cxf.jibx.version>
<cxf.junit.version>4.12</cxf.junit.version>
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
deleted file mode 100644
index 8385fa8..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketServletHolder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket;
-
-import java.io.IOException;
-import java.security.Principal;
-import java.util.Enumeration;
-import java.util.Locale;
-
-import javax.servlet.DispatcherType;
-import javax.servlet.ServletContext;
-
-/**
- *
- */
-public interface WebSocketServletHolder {
- String getAuthType();
- String getContextPath();
- String getLocalAddr();
- String getLocalName();
- int getLocalPort();
- Locale getLocale();
- Enumeration<Locale> getLocales();
- String getProtocol();
- String getRemoteAddr();
- String getRemoteHost();
- int getRemotePort();
- String getRequestURI();
- StringBuffer getRequestURL();
- DispatcherType getDispatcherType();
- boolean isSecure();
- String getPathInfo();
- String getPathTranslated();
- String getScheme();
- String getServerName();
- String getServletPath();
- ServletContext getServletContext();
- int getServerPort();
- Principal getUserPrincipal();
- Object getAttribute(String name);
- void write(byte[] data, int offset, int length) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index a55639c..5dbb930 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -30,14 +30,13 @@ import java.util.TreeMap;
*
*/
public final class WebSocketUtils {
- static final String URI_KEY = "$uri";
- static final String METHOD_KEY = "$method";
- static final String SC_KEY = "$sc";
- static final String SM_KEY = "$sm";
- static final String FLUSHED_KEY = "$flushed";
+ public static final String URI_KEY = "$uri";
+ public static final String METHOD_KEY = "$method";
+ public static final String SC_KEY = "$sc";
+ public static final String FLUSHED_KEY = "$flushed";
+
private static final byte[] CRLF = "\r\n".getBytes();
private static final byte[] COLSP = ": ".getBytes();
- private static final String DEFAULT_SC = "200";
private WebSocketUtils() {
}
@@ -116,6 +115,15 @@ public final class WebSocketUtils {
return buffer.toString();
}
+ public static byte[] readBody(InputStream in) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buf = new byte[8192];
+ for (int n = in.read(buf); n > -1; n = in.read(buf)) {
+ baos.write(buf, 0, n);
+ }
+ return baos.toByteArray();
+ }
+
/**
* Build response bytes with the status and type information specified in the headers.
*
@@ -128,14 +136,15 @@ public final class WebSocketUtils {
public static byte[] buildResponse(Map<String, String> headers, byte[] data, int offset, int length) {
ByteArrayBuilder sb = new ByteArrayBuilder();
String v = headers.get(SC_KEY);
- sb.append(v == null ? DEFAULT_SC : v).append(CRLF);
- appendHeaders(headers, sb);
+ if (v != null) {
+ sb.append(v).append(CRLF);
+ }
+ sb.append(headers);
- byte[] longdata = sb.toByteArray();
if (data != null && length > 0) {
- longdata = buildResponse(longdata, data, offset, length);
+ sb.append(CRLF).append(data, offset, length);
}
- return longdata;
+ return sb.toByteArray();
}
/**
@@ -154,9 +163,10 @@ public final class WebSocketUtils {
if (hlen > 0) {
System.arraycopy(headers, 0, longdata, 0, hlen);
}
- longdata[hlen] = 0x0d;
- longdata[hlen + 1] = 0x0a;
- System.arraycopy(data, offset, longdata, hlen + 2, length);
+ if (data != null && length > 0) {
+ System.arraycopy(CRLF, 0, longdata, hlen, CRLF.length);
+ System.arraycopy(data, offset, longdata, hlen + CRLF.length, length);
+ }
return longdata;
}
@@ -172,8 +182,9 @@ public final class WebSocketUtils {
public static byte[] buildResponse(byte[] data, int offset, int length) {
return buildResponse((byte[])null, data, offset, length);
}
-
- static byte[] buildHeaderLine(String name, String value) {
+
+ //FIXME (consolidate the response building code)
+ public static byte[] buildHeaderLine(String name, String value) {
byte[] hl = new byte[name.length() + COLSP.length + value.length() + CRLF.length];
System.arraycopy(name.getBytes(), 0, hl, 0, name.length());
System.arraycopy(COLSP, 0, hl, name.length(), COLSP.length);
@@ -181,7 +192,7 @@ public final class WebSocketUtils {
System.arraycopy(CRLF, 0, hl, name.length() + COLSP.length + value.length(), CRLF.length);
return hl;
}
-
+
/**
* Build request bytes with the specified method, url, headers, and content entity.
*
@@ -196,34 +207,20 @@ public final class WebSocketUtils {
public static byte[] buildRequest(String method, String url, Map<String, String> headers,
byte[] data, int offset, int length) {
ByteArrayBuilder sb = new ByteArrayBuilder();
- sb.append(method).append(' ').append(url).append(CRLF);
- appendHeaders(headers, sb);
- sb.append(CRLF);
+ sb.append(method).append(' ').append(url).append(CRLF).append(headers);
- byte[] longdata = sb.toByteArray();
if (data != null && length > 0) {
- final byte[] hb = longdata;
- longdata = new byte[hb.length + length];
- System.arraycopy(hb, 0, longdata, 0, hb.length);
- System.arraycopy(data, offset, longdata, hb.length, length);
+ sb.append(CRLF).append(data, offset, length);
}
- return longdata;
+ return sb.toByteArray();
}
- private static void appendHeaders(Map<String, String> headers, ByteArrayBuilder sb) {
- for (Entry<String, String> header : headers.entrySet()) {
- if (!header.getKey().startsWith("$")) {
- sb.append(header.getKey()).append(COLSP).append(header.getValue()).append(CRLF);
- }
- }
- }
-
private static class ByteArrayBuilder {
private ByteArrayOutputStream baos;
public ByteArrayBuilder() {
baos = new ByteArrayOutputStream();
}
-
+
public ByteArrayBuilder append(byte[] b) {
try {
baos.write(b);
@@ -232,21 +229,35 @@ public final class WebSocketUtils {
}
return this;
}
-
+
+ public ByteArrayBuilder append(byte[] b, int offset, int length) {
+ baos.write(b, offset, length);
+ return this;
+ }
+
public ByteArrayBuilder append(String s) {
try {
- baos.write(s.getBytes());
+ baos.write(s.getBytes("utf-8"));
} catch (IOException e) {
// ignore
}
return this;
}
-
- public ByteArrayBuilder append(char c) {
+
+ public ByteArrayBuilder append(int c) {
baos.write(c);
return this;
}
-
+
+ public ByteArrayBuilder append(Map<String, String> map) {
+ for (Entry<String, String> m : map.entrySet()) {
+ if (!m.getKey().startsWith("$")) {
+ append(m.getKey()).append(COLSP).append(m.getValue()).append(CRLF);
+ }
+ }
+ return this;
+ }
+
public byte[] toByteArray() {
return baos.toByteArray();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
deleted file mode 100644
index 9109aed..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletRequest.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.security.Principal;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.AsyncContext;
-import javax.servlet.DispatcherType;
-import javax.servlet.RequestDispatcher;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-import javax.servlet.http.Part;
-
-import org.apache.cxf.common.logging.LogUtils;
-
-/**
- *
- */
-public class WebSocketVirtualServletRequest implements HttpServletRequest {
- private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletRequest.class);
-
- private WebSocketServletHolder webSocketHolder;
- private InputStream in;
- private Map<String, String> requestHeaders;
- private Map<String, Object> attributes;
-
- public WebSocketVirtualServletRequest(WebSocketServletHolder websocket, InputStream in)
- throws IOException {
- this.webSocketHolder = websocket;
- this.in = in;
-
- this.requestHeaders = WebSocketUtils.readHeaders(in);
- String path = requestHeaders.get(WebSocketUtils.URI_KEY);
- String origin = websocket.getRequestURI();
- if (!path.startsWith(origin)) {
- LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
- throw new InvalidPathException();
- }
- this.attributes = new TreeMap<String, Object>(String.CASE_INSENSITIVE_ORDER);
- Object v = websocket.getAttribute("org.apache.cxf.transport.endpoint.address");
- if (v != null) {
- attributes.put("org.apache.cxf.transport.endpoint.address", v);
- }
- }
-
- @Override
- public AsyncContext getAsyncContext() {
- return null;
- }
-
- @Override
- public Object getAttribute(String name) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getAttribute({0}) -> {1}", new Object[] {name , attributes.get(name)});
- }
- return attributes.get(name);
- }
-
- @Override
- public Enumeration<String> getAttributeNames() {
- LOG.log(Level.FINE, "getAttributeNames()");
- return Collections.enumeration(attributes.keySet());
- }
-
- @Override
- public String getCharacterEncoding() {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "getCharacterEncoding()");
- return null;
- }
-
- @Override
- public int getContentLength() {
- LOG.log(Level.FINE, "getContentLength()");
- return 0;
- }
-
- @Override
- public String getContentType() {
- LOG.log(Level.FINE, "getContentType()");
- return requestHeaders.get("Content-Type");
- }
-
- @Override
- public DispatcherType getDispatcherType() {
- LOG.log(Level.FINE, "getDispatcherType()");
- return webSocketHolder.getDispatcherType();
- }
-
- @Override
- public ServletInputStream getInputStream() throws IOException {
- return new ServletInputStream() {
- @Override
- public int read() throws IOException {
- return in.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return in.read(b, off, len);
- }
- };
- }
-
- @Override
- public String getLocalAddr() {
- LOG.log(Level.FINE, "getLocalAddr()");
- return webSocketHolder.getLocalAddr();
- }
-
- @Override
- public String getLocalName() {
- LOG.log(Level.FINE, "getLocalName()");
- return webSocketHolder.getLocalName();
- }
-
- @Override
- public int getLocalPort() {
- LOG.log(Level.FINE, "getLocalPort()");
- return webSocketHolder.getLocalPort();
- }
-
- @Override
- public Locale getLocale() {
- LOG.log(Level.FINE, "getLocale()");
- return webSocketHolder.getLocale();
- }
-
- @Override
- public Enumeration<Locale> getLocales() {
- LOG.log(Level.FINE, "getLocales()");
- return webSocketHolder.getLocales();
- }
-
- @Override
- public String getParameter(String name) {
- // TODO Auto-generated method stub
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getParameter({0})", name);
- }
- return null;
- }
-
- @Override
- public Map<String, String[]> getParameterMap() {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "getParameterMap()");
- return null;
- }
-
- @Override
- public Enumeration<String> getParameterNames() {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "getParameterNames()");
- return null;
- }
-
- @Override
- public String[] getParameterValues(String name) {
- // TODO Auto-generated method stub
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getParameterValues({0})", name);
- }
- return null;
- }
-
- @Override
- public String getProtocol() {
- LOG.log(Level.FINE, "getProtocol");
- return webSocketHolder.getProtocol();
- }
-
- @Override
- public BufferedReader getReader() throws IOException {
- LOG.log(Level.FINE, "getReader");
- return new BufferedReader(new InputStreamReader(in, "utf-8"));
- }
-
- @Override
- public String getRealPath(String path) {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "getRealPath");
- return null;
- }
-
- @Override
- public String getRemoteAddr() {
- LOG.log(Level.FINE, "getRemoteAddr");
- return webSocketHolder.getRemoteAddr();
- }
-
- @Override
- public String getRemoteHost() {
- LOG.log(Level.FINE, "getRemoteHost");
- return webSocketHolder.getRemoteHost();
- }
-
- @Override
- public int getRemotePort() {
- LOG.log(Level.FINE, "getRemotePort");
- return webSocketHolder.getRemotePort();
- }
-
- @Override
- public RequestDispatcher getRequestDispatcher(String path) {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "getRequestDispatcher");
- return null;
- }
-
- @Override
- public String getScheme() {
- LOG.log(Level.FINE, "getScheme");
- return webSocketHolder.getScheme();
- }
-
- @Override
- public String getServerName() {
- return webSocketHolder.getServerName();
- }
-
- @Override
- public int getServerPort() {
- LOG.log(Level.FINE, "getServerPort");
- return webSocketHolder.getServerPort();
- }
-
- @Override
- public ServletContext getServletContext() {
- LOG.log(Level.FINE, "getServletContext");
- return webSocketHolder.getServletContext();
- }
-
- @Override
- public boolean isAsyncStarted() {
- LOG.log(Level.FINE, "isAsyncStarted");
- return false;
- }
-
- @Override
- public boolean isAsyncSupported() {
- LOG.log(Level.FINE, "isAsyncSupported");
- return false;
- }
-
- @Override
- public boolean isSecure() {
- LOG.log(Level.FINE, "isSecure");
- return webSocketHolder.isSecure();
- }
-
- @Override
- public void removeAttribute(String name) {
- LOG.log(Level.FINE, "removeAttribute");
- attributes.remove(name);
- }
-
- @Override
- public void setAttribute(String name, Object o) {
- LOG.log(Level.FINE, "setAttribute");
- attributes.put(name, o);
- }
-
- @Override
- public void setCharacterEncoding(String env) throws UnsupportedEncodingException {
- LOG.log(Level.FINE, "setCharacterEncoding");
- // ignore as we stick to utf-8.
- }
-
- @Override
- public AsyncContext startAsync() {
- LOG.log(Level.FINE, "startAsync");
- return null;
- }
-
- @Override
- public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "startAsync");
- return null;
- }
-
- @Override
- public boolean authenticate(HttpServletResponse servletResponse) throws IOException, ServletException {
- // TODO Auto-generated method stub
- LOG.log(Level.FINE, "authenticate");
- return false;
- }
-
- @Override
- public String getAuthType() {
- LOG.log(Level.FINE, "getAuthType");
- return webSocketHolder.getAuthType();
- }
-
- @Override
- public String getContextPath() {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getContextPath -> " + webSocketHolder.getContextPath());
- }
- return webSocketHolder.getContextPath();
- }
-
- @Override
- public Cookie[] getCookies() {
- LOG.log(Level.FINE, "getCookies");
- return null;
- }
-
- @Override
- public long getDateHeader(String name) {
- LOG.log(Level.FINE, "getDateHeader");
- return 0;
- }
-
- @Override
- public String getHeader(String name) {
- LOG.log(Level.FINE, "getHeader");
- return requestHeaders.get(name);
- }
-
- @Override
- public Enumeration<String> getHeaderNames() {
- LOG.log(Level.FINE, "getHeaderNames");
- return Collections.enumeration(requestHeaders.keySet());
- }
-
- @Override
- public Enumeration<String> getHeaders(String name) {
- LOG.log(Level.FINE, "getHeaders");
- // our protocol assumes no multiple headers
- return Collections.enumeration(Arrays.asList(requestHeaders.get(name)));
- }
-
- @Override
- public int getIntHeader(String name) {
- LOG.log(Level.FINE, "getIntHeader");
- String v = requestHeaders.get(name);
- return v == null ? -1 : Integer.parseInt(v);
- }
-
- @Override
- public String getMethod() {
- LOG.log(Level.FINE, "getMethod");
- return requestHeaders.get(WebSocketUtils.METHOD_KEY);
- }
-
- @Override
- public Part getPart(String name) throws IOException, ServletException {
- LOG.log(Level.FINE, "getPart");
- return null;
- }
-
- @Override
- public Collection<Part> getParts() throws IOException, ServletException {
- LOG.log(Level.FINE, "getParts");
- return null;
- }
-
- @Override
- public String getPathInfo() {
- String uri = requestHeaders.get(WebSocketUtils.URI_KEY);
- String servletpath = webSocketHolder.getServletPath();
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getPathInfo " + servletpath + " " + uri);
- }
- //TODO remove the query string part
- //REVISIT may cache this value in requstHeaders?
- return uri.substring(servletpath.length());
- }
-
- @Override
- public String getPathTranslated() {
- String path = getPathInfo();
- String opathtrans = webSocketHolder.getPathTranslated();
- // some container may choose not to return this value
- if (opathtrans == null) {
- return null;
- }
- String opathinfo = webSocketHolder.getPathInfo();
- LOG.log(Level.FINE, "getPathTranslated " + path + " " + opathinfo);
- int pos = opathtrans.indexOf(opathinfo);
- //REVISIT may cache this value in requstHeaders?
- return new StringBuilder().append(opathtrans.substring(0, pos)).append(path).toString();
- }
-
- @Override
- public String getQueryString() {
- LOG.log(Level.FINE, "getQueryString");
- return null;
- }
-
- @Override
- public String getRemoteUser() {
- LOG.log(Level.FINE, "getRemoteUser");
- return null;
- }
-
- @Override
- public String getRequestURI() {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getRequestURI " + requestHeaders.get(WebSocketUtils.URI_KEY));
- }
- return requestHeaders.get(WebSocketUtils.URI_KEY);
- }
-
- @Override
- public StringBuffer getRequestURL() {
- StringBuffer sb = webSocketHolder.getRequestURL();
- String ouri = webSocketHolder.getRequestURI();
- String uri = getRequestURI();
- sb.append(uri.substring(ouri.length()));
- LOG.log(Level.FINE, "getRequestURL " + uri);
- return sb;
- }
-
- @Override
- public String getRequestedSessionId() {
- LOG.log(Level.FINE, "getRequestedSessionId");
- return null;
- }
-
- @Override
- public String getServletPath() {
- LOG.log(Level.FINE, "getServletPath " + webSocketHolder.getServletPath());
- return webSocketHolder.getServletPath();
- }
-
- @Override
- public HttpSession getSession() {
- LOG.log(Level.FINE, "getSession");
- return null;
- }
-
- @Override
- public HttpSession getSession(boolean create) {
- LOG.log(Level.FINE, "getSession");
- return null;
- }
-
- @Override
- public Principal getUserPrincipal() {
- LOG.log(Level.FINE, "getUserPrincipal");
- return webSocketHolder.getUserPrincipal();
- }
-
- @Override
- public boolean isRequestedSessionIdFromCookie() {
- LOG.log(Level.FINE, "isRequestedSessionIdFromCookie");
- return false;
- }
-
- @Override
- public boolean isRequestedSessionIdFromURL() {
- LOG.log(Level.FINE, "isRequestedSessionIdFromURL");
- return false;
- }
-
- @Override
- public boolean isRequestedSessionIdFromUrl() {
- LOG.log(Level.FINE, "isRequestedSessionIdFromUrl");
- return false;
- }
-
- @Override
- public boolean isRequestedSessionIdValid() {
- LOG.log(Level.FINE, "isRequestedSessionIdValid");
- return false;
- }
-
- @Override
- public boolean isUserInRole(String role) {
- LOG.log(Level.FINE, "isUserInRole");
- return false;
- }
-
- @Override
- public void login(String username, String password) throws ServletException {
- LOG.log(Level.FINE, "login");
-
- }
-
- @Override
- public void logout() throws ServletException {
- LOG.log(Level.FINE, "logout");
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
deleted file mode 100644
index a783712..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketVirtualServletResponse.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collection;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.cxf.common.logging.LogUtils;
-
-/**
- *
- */
-public class WebSocketVirtualServletResponse implements HttpServletResponse {
- private static final Logger LOG = LogUtils.getL7dLogger(WebSocketVirtualServletResponse.class);
- private WebSocketServletHolder webSocketHolder;
- private Map<String, String> responseHeaders;
- private ServletOutputStream outputStream;
-
- public WebSocketVirtualServletResponse(WebSocketServletHolder websocket) {
- this.webSocketHolder = websocket;
- this.responseHeaders = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
- this.outputStream = createOutputStream();
- }
-
- @Override
- public void flushBuffer() throws IOException {
- LOG.log(Level.FINE, "flushBuffer()");
- outputStream.flush();
- }
-
- @Override
- public int getBufferSize() {
- LOG.log(Level.FINE, "getBufferSize()");
- return 0;
- }
-
- @Override
- public String getCharacterEncoding() {
- LOG.log(Level.FINE, "getCharacterEncoding()");
- return null;
- }
-
- @Override
- public String getContentType() {
- LOG.log(Level.FINE, "getContentType()");
- return responseHeaders.get("Content-Type");
- }
-
- @Override
- public Locale getLocale() {
- LOG.log(Level.FINE, "getLocale");
- return null;
- }
-
- @Override
- public ServletOutputStream getOutputStream() throws IOException {
- return outputStream;
- }
-
- @Override
- public PrintWriter getWriter() throws IOException {
- LOG.log(Level.FINE, "getWriter()");
- return new PrintWriter(getOutputStream());
- }
-
- @Override
- public boolean isCommitted() {
- return false;
- }
-
- @Override
- public void reset() {
- }
-
- @Override
- public void resetBuffer() {
- LOG.log(Level.FINE, "resetBuffer()");
- }
-
- @Override
- public void setBufferSize(int size) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setBufferSize({0})", size);
- }
- }
-
- @Override
- public void setCharacterEncoding(String charset) {
- // TODO
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setCharacterEncoding({0})", charset);
- }
- }
-
- @Override
- public void setContentLength(int len) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setContentLength({0})", len);
- }
- responseHeaders.put("Content-Length", Integer.toString(len));
- }
-
- @Override
- public void setContentType(String type) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setContentType({0})", type);
- }
- responseHeaders.put("Content-Type", type);
- }
-
- @Override
- public void setLocale(Locale loc) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setLocale({0})", loc);
- }
- }
-
- @Override
- public void addCookie(Cookie cookie) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "addCookie({0})", cookie);
- }
- }
-
- @Override
- public void addDateHeader(String name, long date) {
- // TODO
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "addDateHeader({0}, {1})", new Object[]{name, date});
- }
- }
-
- @Override
- public void addHeader(String name, String value) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "addHeader({0}, {1})", new Object[]{name, value});
- }
- responseHeaders.put(name, value);
- }
-
- @Override
- public void addIntHeader(String name, int value) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "addIntHeader({0}, {1})", new Object[]{name, value});
- }
- responseHeaders.put(name, Integer.toString(value));
- }
-
- @Override
- public boolean containsHeader(String name) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "containsHeader({0})", name);
- }
- return responseHeaders.containsKey(name);
- }
-
- @Override
- public String encodeRedirectURL(String url) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "encodeRedirectURL({0})", url);
- }
- return null;
- }
-
- @Override
- public String encodeRedirectUrl(String url) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "encodeRedirectUrl({0})", url);
- }
- return null;
- }
-
- @Override
- public String encodeURL(String url) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "encodeURL({0})", url);
- }
- return null;
- }
-
- @Override
- public String encodeUrl(String url) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "encodeUrl({0})", url);
- }
- return null;
- }
-
- @Override
- public String getHeader(String name) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getHeader({0})", name);
- }
- return null;
- }
-
- @Override
- public Collection<String> getHeaderNames() {
- LOG.log(Level.FINE, "getHeaderNames()");
- return null;
- }
-
- @Override
- public Collection<String> getHeaders(String name) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "getHeaders({0})", name);
- }
- return null;
- }
-
- @Override
- public int getStatus() {
- LOG.log(Level.FINE, "getStatus()");
- String v = responseHeaders.get(WebSocketUtils.SC_KEY);
- return v == null ? 200 : Integer.parseInt(v);
- }
-
- @Override
- public void sendError(int sc) throws IOException {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "sendError{0}", sc);
- }
- responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
- webSocketHolder.write(data, 0, data.length);
- }
-
- @Override
- public void sendError(int sc, String msg) throws IOException {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "sendError({0}, {1})", new Object[]{sc, msg});
- }
- responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- responseHeaders.put(WebSocketUtils.SM_KEY, msg);
- byte[] data = WebSocketUtils.buildResponse(responseHeaders, null, 0, 0);
- webSocketHolder.write(data, 0, data.length);
- }
-
- @Override
- public void sendRedirect(String location) throws IOException {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "sendRedirect({0})", location);
- }
- }
-
- @Override
- public void setDateHeader(String name, long date) {
- // ignore
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setDateHeader({0}, {1})", new Object[]{name, date});
- }
- }
-
- @Override
- public void setHeader(String name, String value) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setHeader({0}, {1})", new Object[]{name, value});
- }
- responseHeaders.put(name, value);
- }
-
- @Override
- public void setIntHeader(String name, int value) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setIntHeader({0}, {1})", new Object[]{name, value});
- }
- }
-
- @Override
- public void setStatus(int sc) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setStatus({0})", sc);
- }
- responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- }
-
- @Override
- public void setStatus(int sc, String sm) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "setStatus({0}, {1})", new Object[]{sc, sm});
- }
- responseHeaders.put(WebSocketUtils.SC_KEY, Integer.toString(sc));
- responseHeaders.put(WebSocketUtils.SM_KEY, sm);
- }
-
- private ServletOutputStream createOutputStream() {
- //REVISIT
- // This output buffering is needed as the server side websocket does
- // not support the fragment transmission mode when sending back a large data.
- // And this buffering is only used for the response for the initial service innovation.
- // For the subsequently pushed data to the socket are sent back
- // unbuffered as individual websocket messages.
- // the things to consider :
- // - provide a size limit if we are use this buffering
- // - add a chunking mode in the cxf websocket's binding.
- return new ServletOutputStream() {
- private InternalByteArrayOutputStream buffer = new InternalByteArrayOutputStream();
-
- @Override
- public void write(int b) throws IOException {
- byte[] data = new byte[1];
- data[0] = (byte)b;
- write(data, 0, 1);
- }
-
- @Override
- public void write(byte[] data) throws IOException {
- write(data, 0, data.length);
- }
-
- @Override
- public void write(byte[] data, int offset, int length) throws IOException {
- if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
- // buffer the data until it gets flushed
- buffer.write(data, offset, length);
- } else {
- // unbuffered write to the socket
- String respid = responseHeaders.get(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY);
- byte[] headers = respid != null
- ? WebSocketUtils.buildHeaderLine(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, respid) : null;
- data = WebSocketUtils.buildResponse(headers, data, offset, length);
- webSocketHolder.write(data, 0, data.length);
- }
- }
- public void close() throws IOException {
- if (responseHeaders.get(WebSocketUtils.FLUSHED_KEY) == null) {
- byte[] data = WebSocketUtils.buildResponse(responseHeaders, buffer.getBytes(), 0, buffer.size());
- webSocketHolder.write(data, 0, data.length);
- responseHeaders.put(WebSocketUtils.FLUSHED_KEY, "true");
- }
- super.close();
- }
- };
- }
-
- private static class InternalByteArrayOutputStream extends ByteArrayOutputStream {
- public byte[] getBytes() {
- return buf;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
deleted file mode 100644
index 38e6599..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.atmosphere;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
-import java.security.Principal;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.DispatcherType;
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.transport.websocket.InvalidPathException;
-import org.apache.cxf.transport.websocket.WebSocketConstants;
-import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
-import org.atmosphere.cpr.AtmosphereConfig;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.websocket.WebSocket;
-import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
-import org.atmosphere.websocket.WebSocketProtocol;
-
-/**
- *
- */
-public class AtmosphereWebSocketHandler implements WebSocketProtocol {
- private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketHandler.class);
-
- protected AtmosphereWebSocketServletDestination destination;
-
- //REVISIT make these keys configurable
- private String requestIdKey = WebSocketConstants.DEFAULT_REQUEST_ID_KEY;
- private String responseIdKey = WebSocketConstants.DEFAULT_RESPONSE_ID_KEY;
-
- public AtmosphereWebSocketServletDestination getDestination() {
- return destination;
- }
-
- public void setDestination(AtmosphereWebSocketServletDestination destination) {
- this.destination = destination;
- }
-
- /** {@inheritDoc}*/
- @Override
- public void configure(AtmosphereConfig config) {
- LOG.fine("configure(AtmosphereConfig)");
-
- }
-
- /** {@inheritDoc}*/
- @Override
- public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) {
- LOG.fine("onMessage(WebSocket, String)");
- //TODO may want to use string directly instead of converting it to byte[]
- byte[] bdata = null;
- try {
- bdata = data.getBytes("utf-8");
- } catch (UnsupportedEncodingException e) {
- // will not happen
- }
- return invokeService(webSocket, new ByteArrayInputStream(bdata, 0, bdata.length));
- }
-
- /** {@inheritDoc}*/
- @Override
- public List<AtmosphereRequest> onMessage(WebSocket webSocket, byte[] data, int offset, int length) {
- final byte[] safedata = new byte[length];
- System.arraycopy(data, offset, safedata, 0, length);
- return invokeService(webSocket, new ByteArrayInputStream(safedata, 0, safedata.length));
- }
-
- protected List<AtmosphereRequest> invokeService(final WebSocket webSocket, final InputStream stream) {
- LOG.fine("invokeService(WebSocket, InputStream)");
- // invoke the service asynchronously as onMessage is synchronously blocked (in jetty)
- // make sure the byte array passed to this method is immutable, as the websocket framework
- // may corrupt the byte array after this method is returned (i.e., before the data is returned in
- // the executor's thread.
- executeServiceTask(new Runnable() {
- @Override
- public void run() {
- HttpServletRequest request = null;
- HttpServletResponse response = null;
- try {
- WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket);
- response = createServletResponse(webSocketHolder);
- request = createServletRequest(webSocketHolder, stream);
- if (destination != null) {
- String reqid = request.getHeader(requestIdKey);
- if (reqid != null) {
- response.setHeader(responseIdKey, reqid);
- }
- ((WebSocketDestinationService)destination).invokeInternal(null,
- webSocket.resource().getRequest().getServletContext(),
- request, response);
- }
- } catch (InvalidPathException ex) {
- reportErrorStatus(response, 400);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to invoke service", e);
- }
- }
- });
- return null;
- }
-
- private void executeServiceTask(Runnable r) {
- try {
- destination.getExecutor().execute(r);
- } catch (RejectedExecutionException e) {
- LOG.warning(
- "Executor queue is full, run the service invocation task in caller thread."
- + " Users can specify a larger executor queue to avoid this.");
- r.run();
- }
- }
-
- // may want to move this error reporting code to WebSocketServletHolder
- protected void reportErrorStatus(HttpServletResponse response, int status) {
- if (response != null) {
- response.setStatus(status);
- try {
- response.getWriter().write("\r\n");
- response.getWriter().close();
- response.flushBuffer();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- /** {@inheritDoc}*/
- @Override
- public void onOpen(WebSocket webSocket) {
- LOG.fine("onOpen(WebSocket)");
- }
-
- /** {@inheritDoc}*/
- @Override
- public void onClose(WebSocket webSocket) {
- LOG.fine("onClose(WebSocket)");
-
- }
-
- /** {@inheritDoc}*/
- @Override
- public void onError(WebSocket webSocket, WebSocketException t) {
- LOG.severe("onError(WebSocket, WebSocketException)");
- }
-
-// protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder,
-// byte[] data, int offset, int length)
-// throws IOException {
-// return new WebSocketVirtualServletRequest(webSocketHolder,
-// new ByteArrayInputStream(data, offset, length));
-// }
-
- protected WebSocketVirtualServletRequest createServletRequest(WebSocketServletHolder webSocketHolder,
- InputStream stream)
- throws IOException {
- return new WebSocketVirtualServletRequest(webSocketHolder, stream);
- }
-
- protected WebSocketVirtualServletResponse createServletResponse(WebSocketServletHolder webSocketHolder)
- throws IOException {
- return new WebSocketVirtualServletResponse(webSocketHolder);
- }
-
- protected static class AtmosphereWebSocketServletHolder implements WebSocketServletHolder {
- private WebSocket webSocket;
-
- public AtmosphereWebSocketServletHolder(WebSocket webSocket) {
- this.webSocket = webSocket;
- }
-
- @Override
- public String getAuthType() {
- return webSocket.resource().getRequest().getAuthType();
- }
-
- @Override
- public String getContextPath() {
- return webSocket.resource().getRequest().getContextPath();
- }
-
- @Override
- public String getLocalAddr() {
- return webSocket.resource().getRequest().getLocalAddr();
- }
-
- @Override
- public String getLocalName() {
- return webSocket.resource().getRequest().getLocalName();
- }
-
- @Override
- public int getLocalPort() {
- return webSocket.resource().getRequest().getLocalPort();
- }
-
- @Override
- public Locale getLocale() {
- return webSocket.resource().getRequest().getLocale();
- }
-
- @Override
- public Enumeration<Locale> getLocales() {
- return webSocket.resource().getRequest().getLocales();
- }
-
- @Override
- public String getProtocol() {
- return webSocket.resource().getRequest().getProtocol();
- }
-
- @Override
- public String getRemoteAddr() {
- return webSocket.resource().getRequest().getRemoteAddr();
- }
-
- @Override
- public String getRemoteHost() {
- return webSocket.resource().getRequest().getRemoteHost();
- }
-
- @Override
- public int getRemotePort() {
- return webSocket.resource().getRequest().getRemotePort();
- }
-
- @Override
- public String getRequestURI() {
- return webSocket.resource().getRequest().getRequestURI();
- }
-
- @Override
- public StringBuffer getRequestURL() {
- return webSocket.resource().getRequest().getRequestURL();
- }
-
- @Override
- public DispatcherType getDispatcherType() {
- return webSocket.resource().getRequest().getDispatcherType();
- }
-
- @Override
- public boolean isSecure() {
- return webSocket.resource().getRequest().isSecure();
- }
-
- @Override
- public String getPathInfo() {
- return webSocket.resource().getRequest().getServletPath();
- }
-
- @Override
- public String getPathTranslated() {
- return webSocket.resource().getRequest().getPathTranslated();
- }
-
- @Override
- public String getScheme() {
- return webSocket.resource().getRequest().getScheme();
- }
-
- @Override
- public String getServerName() {
- return webSocket.resource().getRequest().getServerName();
- }
-
- @Override
- public String getServletPath() {
- return webSocket.resource().getRequest().getServletPath();
- }
-
- @Override
- public int getServerPort() {
- return webSocket.resource().getRequest().getServerPort();
- }
-
- @Override
- public ServletContext getServletContext() {
- return webSocket.resource().getRequest().getServletContext();
- }
-
- @Override
- public Principal getUserPrincipal() {
- return webSocket.resource().getRequest().getUserPrincipal();
- }
-
- @Override
- public Object getAttribute(String name) {
- return webSocket.resource().getRequest().getAttribute(name);
- }
-
- @Override
- public void write(byte[] data, int offset, int length) throws IOException {
- webSocket.write(data, offset, length);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index c8e5fae..7aa4cd3 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -21,15 +21,18 @@ package org.apache.cxf.transport.websocket.atmosphere;
import java.io.IOException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import org.apache.cxf.Bus;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.DestinationRegistry;
import org.apache.cxf.transport.servlet.ServletDestination;
@@ -37,16 +40,20 @@ import org.apache.cxf.transport.websocket.WebSocketDestinationService;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptor;
import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
import org.atmosphere.util.Utils;
-import org.atmosphere.websocket.WebSocketProtocol;
/**
*
*/
public class AtmosphereWebSocketServletDestination extends ServletDestination implements
WebSocketDestinationService {
+ private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class);
+
private AtmosphereFramework framework;
private Executor executor;
@@ -54,19 +61,14 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
String path) throws IOException {
super(bus, registry, ei, path);
this.framework = new AtmosphereFramework(false, true);
-
framework.setUseNativeImplementation(false);
+ framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true");
+ framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true");
framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
- //TODO provide a way to switch between the non-stream handler and the stream handler
- framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL,
- AtmosphereWebSocketHandler.class.getName());
+ framework.interceptor(getInterceptor(bus));
+ framework.addAtmosphereHandler("/", new DestinationHandler());
framework.init();
- WebSocketProtocol wsp = framework.getWebSocketProtocol();
- if (wsp instanceof AtmosphereWebSocketHandler) {
- ((AtmosphereWebSocketHandler)wsp).setDestination(this);
- }
-
// the executor for decoupling the service invocation from websocket's onMessage call which is
// synchronously blocked
executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
@@ -77,7 +79,7 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
HttpServletResponse resp) throws IOException {
if (Utils.webSocketEnabled(req)) {
try {
- framework.doCometSupport(AtmosphereRequest.wrap(new HttpServletRequestFilter(req)),
+ framework.doCometSupport(AtmosphereRequest.wrap(req),
AtmosphereResponse.wrap(resp));
} catch (ServletException e) {
throw new IOException(e);
@@ -96,20 +98,44 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im
Executor getExecutor() {
return executor;
}
+
+ private class DestinationHandler extends AbstractReflectorAtmosphereHandler {
+
+ @Override
+ public void onRequest(final AtmosphereResource resource) throws IOException {
+ LOG.fine("onRequest");
+ executeHandlerTask(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ invokeInternal(null,
+ resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse());
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to invoke service", e);
+ }
+ }
+ });
+ }
+ }
- private static class HttpServletRequestFilter extends HttpServletRequestWrapper {
- private static final String TRANSPORT_ADDRESS
- = "org.apache.cxf.transport.endpoint.address";
- private String transportAddress;
- public HttpServletRequestFilter(HttpServletRequest request) {
- super(request);
- transportAddress = (String)request.getAttribute(TRANSPORT_ADDRESS);
+ private void executeHandlerTask(Runnable r) {
+ try {
+ executor.execute(r);
+ } catch (RejectedExecutionException e) {
+ LOG.warning(
+ "Executor queue is full, run the service invocation task in caller thread."
+ + " Users can specify a larger executor queue to avoid this.");
+ r.run();
}
-
- @Override
- public Object getAttribute(String name) {
- return TRANSPORT_ADDRESS.equals(name) ? transportAddress : super.getAttribute(name);
+ }
+
+ //FIXME a temporary workaround until we decide how to customize atmosphere using cxf's destination configuration
+ private AtmosphereInterceptor getInterceptor(Bus bus) {
+ AtmosphereInterceptor ai = (AtmosphereInterceptor)bus.getProperty("atmosphere.interceptor");
+ if (ai == null) {
+ ai = new DefaultProtocolInterceptor();
}
-
+ LOG.info("AtmosphereInterceptor: " + ai);
+ return ai;
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
deleted file mode 100644
index 1f4cc00..0000000
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketStreamHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.transport.websocket.atmosphere;
-
-import java.io.InputStream;
-import java.io.Reader;
-import java.util.List;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.atmosphere.cpr.AtmosphereRequest;
-import org.atmosphere.websocket.WebSocket;
-import org.atmosphere.websocket.WebSocketProtocolStream;
-
-/**
- *
- */
-public class AtmosphereWebSocketStreamHandler extends AtmosphereWebSocketHandler implements
- WebSocketProtocolStream {
- private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketStreamHandler.class);
-
- @Override
- public List<AtmosphereRequest> onTextStream(WebSocket webSocket, Reader r) {
- LOG.fine("onTextStream(WebSocket, Reader)");
- //TODO add support for Reader
- throw new IllegalArgumentException("not implemented");
- }
-
- @Override
- public List<AtmosphereRequest> onBinaryStream(WebSocket webSocket, InputStream stream) {
- return invokeService(webSocket, stream);
- }
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
new file mode 100644
index 0000000..4d3c2e6
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.websocket.InvalidPathException;
+import org.apache.cxf.transport.websocket.WebSocketConstants;
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.atmosphere.config.service.AtmosphereInterceptorService;
+import org.atmosphere.cpr.Action;
+import org.atmosphere.cpr.AsyncIOInterceptor;
+import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
+import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereFramework;
+import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
+import org.atmosphere.cpr.AtmosphereInterceptorWriter;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResponse;
+import org.atmosphere.cpr.FrameworkConfig;
+
+/**
+ * DefaultProtocolInterceptor provides the default CXF's WebSocket protocol that uses.
+ *
+ */
+@AtmosphereInterceptorService
+public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
+ private static final Logger LOG = LogUtils.getL7dLogger(DefaultProtocolInterceptor.class);
+
+ private static final String REQUEST_DISPATCHED = "request.dispatched";
+ private static final String RESPONSE_PARENT = "response.parent";
+
+ private final AsyncIOInterceptor interceptor = new Interceptor();
+
+ @Override
+ public Action inspect(final AtmosphereResource r) {
+ LOG.log(Level.FINE, "inspect");
+ AtmosphereRequest request = r.getRequest();
+
+ if (request.getAttribute(REQUEST_DISPATCHED) == null) {
+ AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(), request);
+
+ AtmosphereFramework framework = r.getAtmosphereConfig().framework();
+ try {
+ byte[] data = WebSocketUtils.readBody(request.getInputStream());
+ if (data.length == 0) {
+ return Action.CANCELLED;
+ }
+
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "inspecting data {0}", new String(data));
+ }
+ try {
+ AtmosphereRequest ar = createAtmosphereRequest(request, data);
+ ar.setAttribute(REQUEST_DISPATCHED, "true");
+ String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+ if (refid != null) {
+ ar.setAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY, refid);
+ }
+ // This is a new request, we must clean the Websocket AtmosphereResource.
+ request.removeAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE);
+ response.request(ar);
+ attachWriter(r);
+
+ Action action = framework.doCometSupport(ar, response);
+ if (action.type() == Action.TYPE.SUSPEND) {
+ ar.destroyable(false);
+ response.destroyable(false);
+ }
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Error during request dispatching", e);
+ if (e instanceof InvalidPathException) {
+ response.setStatus(400);
+ } else {
+ response.setStatus(500);
+ }
+ response.getOutputStream().write(createResponse(response, null, true));
+ }
+ return Action.CANCELLED;
+ } catch (IOException e) {
+ LOG.log(Level.WARNING, "Error during protocol processing", e);
+ return Action.CONTINUE;
+ }
+ } else {
+ request.setAttribute(REQUEST_DISPATCHED, null);
+ request.setAttribute(RESPONSE_PARENT, null);
+ request.destroyable(false);
+ }
+ return Action.CONTINUE;
+ }
+
+ private void attachWriter(final AtmosphereResource r) {
+ AtmosphereResponse res = r.getResponse();
+ AsyncIOWriter writer = res.getAsyncIOWriter();
+
+ if (writer instanceof AtmosphereInterceptorWriter) {
+ //REVIST need a better way to add a custom filter at the first entry and not at the last as
+ // e.g. interceptor(AsyncIOInterceptor interceptor, int position)
+ Deque<AsyncIOInterceptor> filters = AtmosphereInterceptorWriter.class.cast(writer).filters();
+ if (!filters.contains(interceptor)) {
+ filters.addFirst(interceptor);
+ }
+ }
+ }
+
+ /**
+ * Creates a virtual request using the specified parent request and the actual data.
+ *
+ * @param r
+ * @param data
+ * @return
+ * @throws IOException
+ */
+ protected AtmosphereRequest createAtmosphereRequest(AtmosphereRequest r, byte[] data) throws IOException {
+ AtmosphereRequest.Builder b = new AtmosphereRequest.Builder();
+ ByteArrayInputStream in = new ByteArrayInputStream(data);
+ Map<String, String> hdrs = WebSocketUtils.readHeaders(in);
+ String path = hdrs.get(WebSocketUtils.URI_KEY);
+ String origin = r.getRequestURI();
+ if (!path.startsWith(origin)) {
+ LOG.log(Level.WARNING, "invalid path: {0} not within {1}", new Object[]{path, origin});
+ throw new InvalidPathException();
+ }
+
+ String requestURI = path;
+ String requestURL = r.getRequestURL() + requestURI.substring(r.getRequestURI().length());
+ String contentType = hdrs.get("Content-Type");
+
+ String method = hdrs.get(WebSocketUtils.METHOD_KEY);
+ b.pathInfo(path)
+ .contentType(contentType)
+ .headers(hdrs)
+ .method(method)
+ .requestURI(requestURI)
+ .requestURL(requestURL)
+ .request(r);
+ // add the body only if it is present
+ byte[] body = WebSocketUtils.readBody(in);
+ if (body.length > 0) {
+ b.body(body);
+ }
+ return b.build();
+ }
+
+ /**
+ * Creates a response data based on the specified payload.
+ *
+ * @param response
+ * @param payload
+ * @param parent
+ * @return
+ */
+ protected byte[] createResponse(AtmosphereResponse response, byte[] payload, boolean parent) {
+ AtmosphereRequest request = response.request();
+ String refid = (String)request.getAttribute(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
+
+ Map<String, String> headers = new HashMap<String, String>();
+ if (refid != null) {
+ response.addHeader(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+ headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
+ }
+ if (parent) {
+ // include the status code and content-type
+ headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+ if (payload != null && payload.length > 0) {
+ headers.put("Content-Type", response.getContentType());
+ }
+ }
+ return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
+ }
+
+ private final class Interceptor extends AsyncIOInterceptorAdapter {
+
+ @Override
+ public byte[] transformPayload(AtmosphereResponse response, byte[] responseDraft, byte[] data)
+ throws IOException {
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "transformPayload with draft={0}", new String(responseDraft));
+ }
+ AtmosphereRequest request = response.request();
+ if (request.getAttribute(RESPONSE_PARENT) == null) {
+ request.setAttribute(RESPONSE_PARENT, "true");
+ return createResponse(response, responseDraft, true);
+ } else {
+ return createResponse(response, responseDraft, false);
+ }
+ }
+
+ @Override
+ public byte[] error(AtmosphereResponse response, int statusCode, String reasonPhrase) {
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "status={0}", statusCode);
+ }
+ response.setStatus(statusCode, reasonPhrase);
+ return createResponse(response, null, true);
+ }
+ }
+
+ // a workaround to flush the header data upon close when no write operation occurs
+ private class WrappedAtmosphereResponse extends AtmosphereResponse {
+ public WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) {
+ super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req, resp.isDestroyable());
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream() throws IOException {
+ final ServletOutputStream delegate = super.getOutputStream();
+ return new ServletOutputStream() {
+ private boolean written;
+
+ @Override
+ public void write(int i) throws IOException {
+ written = true;
+ delegate.write(i);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!written) {
+ delegate.write(createResponse(WrappedAtmosphereResponse.this, null, true));
+ }
+ delegate.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ written = true;
+ delegate.write(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ written = true;
+ delegate.write(b);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
index 6ae3c9f..f48efb5 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/JettyWebSocket.java
@@ -41,9 +41,6 @@ import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.transport.websocket.InvalidPathException;
import org.apache.cxf.transport.websocket.WebSocketConstants;
-import org.apache.cxf.transport.websocket.WebSocketServletHolder;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletRequest;
-import org.apache.cxf.transport.websocket.WebSocketVirtualServletResponse;
import org.eclipse.jetty.websocket.WebSocket;
class JettyWebSocket implements WebSocket.OnBinaryMessage, WebSocket.OnTextMessage {
http://git-wip-us.apache.org/repos/asf/cxf/blob/28e185b2/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
new file mode 100644
index 0000000..cd38aa4
--- /dev/null
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/jetty/WebSocketServletHolder.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.jetty;
+
+import java.io.IOException;
+import java.security.Principal;
+import java.util.Enumeration;
+import java.util.Locale;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.ServletContext;
+
+/**
+ *
+ */
+public interface WebSocketServletHolder {
+ String getAuthType();
+ String getContextPath();
+ String getLocalAddr();
+ String getLocalName();
+ int getLocalPort();
+ Locale getLocale();
+ Enumeration<Locale> getLocales();
+ String getProtocol();
+ String getRemoteAddr();
+ String getRemoteHost();
+ int getRemotePort();
+ String getRequestURI();
+ StringBuffer getRequestURL();
+ DispatcherType getDispatcherType();
+ boolean isSecure();
+ String getPathInfo();
+ String getPathTranslated();
+ String getScheme();
+ String getServerName();
+ String getServletPath();
+ ServletContext getServletContext();
+ int getServerPort();
+ Principal getUserPrincipal();
+ Object getAttribute(String name);
+ void write(byte[] data, int offset, int length) throws IOException;
+}