You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2015/11/12 16:23:34 UTC

[8/9] cxf git commit: More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3

More updates to start getting the atmosphere based websocket stuff to work with Jetty 9.3


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6c67f878
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6c67f878
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6c67f878

Branch: refs/heads/3.1.x-fixes
Commit: 6c67f878e2c210e7f03c4aea52b2ba2eec7d611a
Parents: 2ec8759
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Nov 11 13:57:08 2015 -0500
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Nov 12 10:23:13 2015 -0500

----------------------------------------------------------------------
 .../atmosphere/DefaultProtocolInterceptor.java  | 109 ++++++++++++-------
 .../jaxrs/websocket/BookStoreWebSocket.java     |   2 +
 2 files changed, 70 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/6c67f878/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
index b70a467..54431ce 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,6 +33,7 @@ import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.transport.websocket.InvalidPathException;
 import org.apache.cxf.transport.websocket.WebSocketConstants;
 import org.apache.cxf.transport.websocket.WebSocketUtils;
@@ -107,7 +109,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
         AtmosphereRequest request = r.getRequest();
 
         if (request.getAttribute(REQUEST_DISPATCHED) == null) {
-            AtmosphereResponse response = new WrappedAtmosphereResponse(r.getResponse(), request);
+            AtmosphereResponse response = null;
 
             AtmosphereFramework framework = r.getAtmosphereConfig().framework();
             try {
@@ -121,6 +123,7 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
                 }
                 try {
                     AtmosphereRequest ar = createAtmosphereRequest(request, data);
+                    response = new WrappedAtmosphereResponse(r.getResponse(), ar);
                     ar.attributes().put(REQUEST_DISPATCHED, "true");
                     String refid = ar.getHeader(WebSocketConstants.DEFAULT_REQUEST_ID_KEY);
                     if (refid != null) {
@@ -138,12 +141,17 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Error during request dispatching", e);
+                    if (response == null) {
+                        response = new WrappedAtmosphereResponse(r.getResponse(), request);
+                    }
                     if (e instanceof InvalidPathException) {
-                        response.setStatus(400);
+                        response.setIntHeader(WebSocketUtils.SC_KEY, 400);
                     } else {
-                        response.setStatus(500);
+                        response.setIntHeader(WebSocketUtils.SC_KEY, 500);
                     }
-                    response.getOutputStream().write(createResponse(response, null, true));
+                    OutputStream out = response.getOutputStream();
+                    out.write(createResponse(response, null, true));
+                    out.close();
                 }
                 return Action.CANCELLED;
             } catch (IOException e) {
@@ -228,7 +236,11 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
         }
         if (parent) {
             // include the status code and content-type and those matched headers
-            headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
+            String sc = response.getHeader(WebSocketUtils.SC_KEY); 
+            if (sc == null) {
+                sc = Integer.toString(response.getStatus());
+            }
+            headers.put(WebSocketUtils.SC_KEY, sc);            
             if (payload != null && payload.length > 0) {
                 headers.put("Content-Type",  response.getContentType());
             }
@@ -273,47 +285,62 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
 
     // a workaround to flush the header data upon close when no write operation occurs  
     private class WrappedAtmosphereResponse extends AtmosphereResponse {
-        WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) {
-            super((HttpServletResponse)resp.getResponse(), resp.getAsyncIOWriter(), req, resp.isDestroyable());
+        final AtmosphereResponse response;
+        final ServletOutputStream delegate;
+        ServletOutputStream sout;
+        WrappedAtmosphereResponse(AtmosphereResponse resp, AtmosphereRequest req) throws IOException {
+            super((HttpServletResponse)resp.getResponse(), null, req, resp.isDestroyable());
+            response = resp;
+            response.request(req);
+            delegate = super.getOutputStream();
         }
 
         @Override
         public ServletOutputStream getOutputStream() throws IOException {
-            final ServletOutputStream delegate = super.getOutputStream();
-            return new ServletOutputStream() {
-                private boolean written;
-
-                @Override
-                public void write(int i) throws IOException {
-                    written = true;
-                    delegate.write(i);
-                }
-
-                @Override
-                public void close() throws IOException {
-                    if (!written) {
-                        delegate.write(createResponse(WrappedAtmosphereResponse.this, null, true));
+            if (sout == null) {
+                sout = new ServletOutputStream() {
+                    CachedOutputStream out = new CachedOutputStream();
+                    OutputStream getOut() {
+                        if (out == null) {
+                            out = new CachedOutputStream();
+                        }
+                        return out;
+                    }                
+                    void send(boolean complete) throws IOException {
+                        if (out == null) {
+                            return;
+                        }
+                        if (response.getStatus() >= 400) {
+                            int i = response.getStatus();
+                            response.setStatus(200);
+                            response.addIntHeader(WebSocketUtils.SC_KEY, i);
+                        }
+                        out.flush();
+                        out.lockOutputStream();
+                        out.writeCacheTo(delegate);
+                        delegate.flush();
+                        out.close();
+                        out = null;
                     }
-                    delegate.close();
-                }
-
-                @Override
-                public void flush() throws IOException {
-                    delegate.flush();
-                }
-
-                @Override
-                public void write(byte[] b, int off, int len) throws IOException {
-                    written = true;
-                    delegate.write(b, off, len);
-                }
-
-                @Override
-                public void write(byte[] b) throws IOException {
-                    written = true;
-                    delegate.write(b);
-                }
-            };
+                    public void write(int i) throws IOException {
+                        getOut().write(i);
+                    }
+                    public void close() throws IOException {
+                        send(true);
+                        delegate.close();
+                    }
+                    public void flush() throws IOException {
+                        send(false);
+                    }
+                    public void write(byte[] b, int off, int len) throws IOException {
+                        getOut().write(b, off, len);
+                    }
+                    public void write(byte[] b) throws IOException {
+                        getOut().write(b);
+                    }
+                };
+            }
+            return sout;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/6c67f878/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
----------------------------------------------------------------------
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
index 39e64fc..888ada0 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java
@@ -121,6 +121,7 @@ public class BookStoreWebSocket {
                             for (int i = 2; i <= 5; i++) {
                                 Thread.sleep(500);
                                 out.write(new Book("WebSocket" + i, i));
+                                out.getEntityStream().flush();
                             }
                         } catch (Exception e) {
                             e.printStackTrace();
@@ -166,6 +167,7 @@ public class BookStoreWebSocket {
             OutputStream out = it.next();
             try {
                 out.write(("News: event " + name + " created").getBytes());
+                out.flush();
             } catch (IOException e) {
                 it.remove();
                 e.printStackTrace();