You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2015/03/19 16:03:11 UTC

cxf git commit: [CXF-6232] Refactor CXF's Atmosphere based WebSocket transport for more flexible and extensible handling

Repository: cxf
Updated Branches:
  refs/heads/master e2bc8d8f3 -> a79158c70


[CXF-6232] Refactor CXF's Atmosphere based WebSocket transport for more flexible and extensible handling


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a79158c7
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a79158c7
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a79158c7

Branch: refs/heads/master
Commit: a79158c70fb2c8cedf109e78eeec32691a32e306
Parents: e2bc8d8
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Thu Mar 19 16:02:42 2015 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Thu Mar 19 16:03:02 2015 +0100

----------------------------------------------------------------------
 .../cxf/transport/websocket/WebSocketUtils.java | 25 ++++--
 .../atmosphere/DefaultProtocolInterceptor.java  | 49 ++++++++++-
 .../DefaultProtocolInterceptorTest.java         | 91 ++++++++++++++++++++
 3 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/a79158c7/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
index 5dbb930..adb621d 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/WebSocketUtils.java
@@ -50,19 +50,24 @@ public final class WebSocketUtils {
      * - charset other than utf-8. (although i would have preferred iso-8859-1 ;-)
      * 
      * @param in the input stream
+     * @param req true if the input stream includes the request line
      * @return a map of name value pairs.
      * @throws IOException
      */
-    public static Map<String, String> readHeaders(InputStream in) throws IOException {
+    public static Map<String, String> readHeaders(InputStream in, boolean req) throws IOException {
         Map<String, String> headers = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
-        // read the request line
-        String line = readLine(in);
-        int del = line.indexOf(' ');
-        if (del < 0) {
-            throw new IOException("invalid request: " + line);
+        String line;
+        int del;
+        if (req) {
+            // read the request line
+            line = readLine(in);
+            del = line.indexOf(' ');
+            if (del < 0) {
+                throw new IOException("invalid request: " + line);
+            }
+            headers.put(METHOD_KEY, line.substring(0, del).trim());
+            headers.put(URI_KEY, line.substring(del + 1).trim());
         }
-        headers.put(METHOD_KEY, line.substring(0, del).trim());
-        headers.put(URI_KEY, line.substring(del + 1).trim());
         
         // read headers
         while ((line = readLine(in)) != null) {
@@ -79,6 +84,10 @@ public final class WebSocketUtils {
         return headers;
     }
 
+    public static Map<String, String> readHeaders(InputStream in) throws IOException {
+        return readHeaders(in, true);
+    }
+
 
     /**
      * Read a line terminated by '\n' optionally preceded by '\r' from the 

http://git-wip-us.apache.org/repos/asf/cxf/blob/a79158c7/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
index 4d3c2e6..2bc83b6 100644
--- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
+++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptor.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Pattern;
 
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServletResponse;
@@ -39,6 +40,7 @@ import org.atmosphere.cpr.Action;
 import org.atmosphere.cpr.AsyncIOInterceptor;
 import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
 import org.atmosphere.cpr.AsyncIOWriter;
+import org.atmosphere.cpr.AtmosphereConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
 import org.atmosphere.cpr.AtmosphereInterceptorWriter;
@@ -60,6 +62,44 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
 
     private final AsyncIOInterceptor interceptor = new Interceptor();
 
+    private Pattern includedheaders;
+    private Pattern excludedheaders;
+
+    @Override
+    public void configure(AtmosphereConfig config) {
+        super.configure(config);
+        String p = config.getInitParameter("org.apache.cxf.transport.websocket.atmosphere.transport.includedheaders");
+        if (p != null) {
+            includedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        p = config.getInitParameter("org.apache.cxf.transport.websocket.atmosphere.transport.excludedheaders");
+        if (p != null) {
+            excludedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+    }
+
+    public DefaultProtocolInterceptor includedheaders(String p) {
+        if (p != null) {
+            this.includedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        return this;
+    }
+
+    public void setIncludedheaders(Pattern includedheaders) {
+        this.includedheaders = includedheaders;
+    }
+
+    public DefaultProtocolInterceptor excludedheaders(String p) {
+        if (p != null) {
+            this.excludedheaders = Pattern.compile(p, Pattern.CASE_INSENSITIVE);
+        }
+        return this;
+    }
+
+    public void setExcludedheaders(Pattern excludedheaders) {
+        this.excludedheaders = excludedheaders;
+    }
+
     @Override
     public Action inspect(final AtmosphereResource r) {
         LOG.log(Level.FINE, "inspect");
@@ -188,11 +228,18 @@ public class DefaultProtocolInterceptor extends AtmosphereInterceptorAdapter {
             headers.put(WebSocketConstants.DEFAULT_RESPONSE_ID_KEY, refid);
         }
         if (parent) {
-            // include the status code and content-type
+            // include the status code and content-type and those matched headers
             headers.put(WebSocketUtils.SC_KEY, Integer.toString(response.getStatus()));
             if (payload != null && payload.length > 0) {
                 headers.put("Content-Type",  response.getContentType());
             }
+            for (Map.Entry<String, String> hv : response.headers().entrySet()) {
+                if (!"Content-Type".equalsIgnoreCase(hv.getKey()) 
+                    && includedheaders != null && includedheaders.matcher(hv.getKey()).matches()
+                    && !(excludedheaders != null && excludedheaders.matcher(hv.getKey()).matches())) {
+                    headers.put(hv.getKey(), hv.getValue());
+                }
+            }
         }
         return WebSocketUtils.buildResponse(headers, payload, 0, payload == null ? 0 : payload.length);
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/a79158c7/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
new file mode 100644
index 0000000..e6b70af
--- /dev/null
+++ b/rt/transports/websocket/src/test/java/org/apache/cxf/transport/websocket/atmosphere/DefaultProtocolInterceptorTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.websocket.atmosphere;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.cxf.transport.websocket.WebSocketUtils;
+import org.atmosphere.cpr.AtmosphereRequest;
+import org.atmosphere.cpr.AtmosphereResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class DefaultProtocolInterceptorTest extends Assert {
+
+    @Test
+    public void testCreateResponseWithHeadersFiltering() throws Exception {
+        DefaultProtocolInterceptor dpi = new DefaultProtocolInterceptor();
+        AtmosphereRequest request = AtmosphereRequest.newInstance();
+        AtmosphereResponse response = AtmosphereResponse.newInstance();
+        response.request(request);
+        String payload = "hello cxf";
+        String contentType = "text/plain";
+        response.headers().put("Content-Type", contentType);
+
+        byte[] transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType}, 
+                          payload, transformed);
+
+        response.headers().put("X-fruit", "peach");
+        response.headers().put("X-vegetable", "tomato");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType}, 
+                          payload, transformed);
+
+        dpi.includedheaders("X-f.*");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach"}, 
+                          payload, transformed);
+
+        dpi.includedheaders("X-.*");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach", "X-vegetable", "tomato"}, 
+                          payload, transformed);
+
+        dpi.excludedheaders(".*able");
+        transformed = dpi.createResponse(response, payload.getBytes(), true);
+        verifyTransformed("200", 
+                          new String[]{"Content-Type", contentType, "X-Fruit", "peach"}, 
+                          payload, transformed);
+    }
+
+    private void verifyTransformed(String code, String[] headers, String body, byte[] transformed) throws Exception {
+        InputStream in = new ByteArrayInputStream(transformed);
+        String c = WebSocketUtils.readLine(in);
+        Map<String, String> hs = WebSocketUtils.readHeaders(in, false);
+        byte[] b = WebSocketUtils.readBody(in);
+        assertEquals(code, c);
+        assertEquals(headers.length >> 1, hs.size());
+        for (int i = 0; i < headers.length; i += 2) {
+            assertEquals(headers[i + 1], hs.get(headers[i]));
+        }
+        assertEquals(body, new String(b));
+    }
+}