You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2008/07/19 05:39:38 UTC

svn commit: r678094 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/ common/common/src/main/java/org/apache/cxf/helpers/ rt/transports/http/src/main/java/org/apache/cxf/transport/http/ rt/transports/http/src/main/java/org/apache/cxf/transport/http...

Author: dkulp
Date: Fri Jul 18 20:39:37 2008
New Revision: 678094

URL: http://svn.apache.org/viewvc?rev=678094&view=rev
Log:
Implement threshold for chunking.  Some http cleanups.  Throw exceptions on some errors.

Added:
    cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java   (with props)
Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
    cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java
    cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd
    cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java

Added: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java?rev=678094&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java Fri Jul 18 20:39:37 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.io;
+
+import java.io.IOException;
+
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+
+/**
+ * Outputstream that will buffer a certain amount before writing anything to the underlying
+ * stream.   When the threshold is reached, provides a callback point to allow the
+ * subclass to update headers, replace/set the output stream, etc...
+ * 
+ * Also provides a callback for when the stream is closed without it reaching the threshold.
+ */
+public abstract class AbstractThresholdOutputStream extends AbstractWrappedOutputStream {
+    
+    protected int threshold;
+    protected LoadingByteArrayOutputStream buffer;
+    
+    public AbstractThresholdOutputStream(int threshold) {
+        this.threshold = threshold;
+        if (threshold > 0) {
+            buffer = new LoadingByteArrayOutputStream(threshold + 1);
+        }
+    }
+    
+    
+    public abstract void thresholdReached();
+    public abstract void thresholdNotReached();
+    
+    
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (buffer != null) {
+            int space = threshold - buffer.size();
+            if (space > len) {
+                space = len;
+            }
+            buffer.write(b, off, space);
+            len -= space;
+            off += space;
+            
+            if (buffer.size() >= threshold) {
+                thresholdReached();
+                unBuffer();
+            }
+            if (len == 0) {
+                return;
+            }
+        }
+        super.write(b, off, len);
+    }
+
+
+    @Override
+    public void write(int b) throws IOException {
+        if (buffer != null) {
+            buffer.write(b);
+            if (buffer.size() >= threshold) {
+                thresholdReached();
+                unBuffer();
+            }
+            return;
+        }
+        super.write(b);
+    }
+
+    public void unBuffer() throws IOException {
+        if (buffer != null) {
+            if (buffer.size() > 0) {
+                super.write(buffer.getRawBytes(), 0, buffer.size());
+            }
+            buffer = null;
+        }  
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (buffer != null) {
+            thresholdNotReached();
+            unBuffer();
+        }
+        super.close();
+    }
+
+}

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java Fri Jul 18 20:39:37 2008
@@ -50,11 +50,7 @@
 
     @Override
     public void write(byte[] b) throws IOException {
-        if (!written) {
-            onFirstWrite();
-            written = true;
-        }
-        wrappedStream.write(b);
+        write(b, 0, b.length);
     }
 
     @Override

Modified: cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java (original)
+++ cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java Fri Jul 18 20:39:37 2008
@@ -53,4 +53,8 @@
         }
         return buf;
     }
+    
+    public byte[] getRawBytes() {
+        return buf;
+    }
 }
\ No newline at end of file

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Jul 18 20:39:37 2008
@@ -51,7 +51,8 @@
 import org.apache.cxf.configuration.security.ProxyAuthorizationPolicy;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.helpers.HttpHeaderHelper;
-import org.apache.cxf.io.AbstractWrappedOutputStream;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.io.AbstractThresholdOutputStream;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -510,6 +511,7 @@
         }
         
         boolean isChunking = false;
+        int chunkThreshold = 0;
         // We must cache the request if we have basic auth supplier
         // without preemptive basic auth.
         if (basicAuthSupplier != null) {
@@ -521,24 +523,23 @@
                 + " We must cache request.");
         }
         if (getClient().isAutoRedirect()) {
-            // If the AutoRedirect property is set then we cannot
-            // use chunked streaming mode. We ignore the "AllowChunking" 
-            // property if AutoRedirect is turned on.
-            
             needToCacheRequest = true;
             LOG.log(Level.INFO, "AutoRedirect is turned on.");
-        } else {
-            if (!connection.getRequestMethod().equals("GET")
-                && getClient().isAllowChunking()
-                && !needToCacheRequest) {
-                //TODO: The chunking mode be configured or at least some
-                // documented client constant.
-                //use -1 and allow the URL connection to pick a default value
-                connection.setChunkedStreamingMode(-1);
-                isChunking = true;
+        }
+        if (!connection.getRequestMethod().equals("GET")
+            && getClient().isAllowChunking()) {
+            //TODO: The chunking mode be configured or at least some
+            // documented client constant.
+            //use -1 and allow the URL connection to pick a default value
+            isChunking = true;
+            chunkThreshold = getClient().getChunkingThreshold();
+            if (chunkThreshold <= 0) {
+                chunkThreshold = 0;
+                connection.setChunkedStreamingMode(-1);                    
             }
         }
         
+        
         //Do we need to maintain a session?
         maintainSession = Boolean.TRUE.equals((Boolean)message.get(Message.MAINTAIN_SESSION));
         
@@ -567,8 +568,10 @@
         message.setContent(OutputStream.class,
                 new WrappedOutputStream(
                         message, connection,
-                        needToCacheRequest, isChunking));
-        
+                        needToCacheRequest, 
+                        isChunking,
+                        chunkThreshold));
+       
         // We are now "ready" to "send" the message. 
     }
     
@@ -1411,7 +1414,11 @@
                         + newURL
                         + "'");
                 }
-                return connection;
+                throw new IOException("Redirect loop detected on Conduit \"" 
+                                      + getConduitName() 
+                                      + "\" on '" 
+                                      + newURL
+                                      + "'");
             }
             // We are going to redirect.
             // Remove any Server Authentication Information for the previous
@@ -1515,7 +1522,12 @@
                     + "\"");
             }
                     
-            return connection;
+            throw new IOException("Authorization loop detected on Conduit \"" 
+                                  + getConduitName() 
+                                  + "\" on URL \""
+                                  + "\" with realm \""
+                                  + realm
+                                  + "\"");
         }
         
         HttpBasicAuthSupplier.UserPass up = 
@@ -1583,6 +1595,8 @@
         }
         message.put(KEY_HTTP_CONNECTION, connection);
 
+        connection.setFixedLengthStreamingMode(stream.size());
+        
         // Need to set the headers before the trust decision
         // because they are set before the connect().
         setURLRequestHeaders(message);
@@ -1602,7 +1616,7 @@
             return connection;
         }
         
-        // Trust is okay, write the cached request.
+        // Trust is okay, write the cached request
         OutputStream out = connection.getOutputStream();
         stream.writeCacheTo(out);
         
@@ -1710,7 +1724,7 @@
      * Wrapper output stream responsible for flushing headers and handling
      * the incoming HTTP-level response (not necessarily the MEP response).
      */
-    protected class WrappedOutputStream extends AbstractWrappedOutputStream {
+    protected class WrappedOutputStream extends AbstractThresholdOutputStream {
         /**
          * This field contains the currently active connection.
          */
@@ -1739,14 +1753,30 @@
                 Message m, 
                 HttpURLConnection c, 
                 boolean possibleRetransmit,
-                boolean isChunking
+                boolean isChunking,
+                int chunkThreshold
         ) {
-            super();
+            super(chunkThreshold);
             this.outMessage = m;
             connection = c;
             cachingForRetransmission = possibleRetransmit;
             chunking = isChunking;
         }
+        
+        
+        @Override
+        public void thresholdNotReached() {
+            if (chunking) {
+                connection.setFixedLengthStreamingMode(buffer.size());
+            }
+        }
+
+        @Override
+        public void thresholdReached() {
+            if (chunking) {
+                connection.setChunkedStreamingMode(-1);
+            }
+        }
 
         /**
          * Perform any actions required on stream flush (freeze headers,
@@ -1811,6 +1841,12 @@
          * Perform any actions required on stream closure (handle response etc.)
          */
         public void close() throws IOException {
+            if (buffer != null && buffer.size() > 0) {
+                thresholdNotReached();
+                LoadingByteArrayOutputStream tmp = buffer;
+                buffer = null;
+                super.write(tmp.getRawBytes(), 0, tmp.size());
+            }
             if (!written) {
                 handleHeadersTrustCaching();
             }
@@ -1988,6 +2024,8 @@
             
             incomingObserver.onMessage(inMessage);
         }
+
+
     }
     
     /**

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java Fri Jul 18 20:39:37 2008
@@ -302,7 +302,8 @@
         }
         
         if (compatible) {
-            compatible &= p1.getContentType().equals(p2.getContentType());
+            compatible = !p1.isSetContentType() || !p2.isSetContentType()
+                || p1.getContentType().equals(p2.getContentType());
         }
         
         if (compatible) {
@@ -495,7 +496,7 @@
         }
         
         if (compatible) {
-            compatible &= p1.getContentType().equals(p2.getContentType());
+            compatible &= compatible(p1.getContentType(), p2.getContentType());
         }
         
         if (compatible) {

Modified: cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd (original)
+++ cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd Fri Jul 18 20:39:37 2008
@@ -117,10 +117,10 @@
                     </xs:annotation>      
                 </xs:attribute>
 
-                <xs:attribute name="ContentType" type="xs:string" use="optional" default="text/xml">
+                <xs:attribute name="ContentType" type="xs:string" use="optional">
                     <xs:annotation>
                         <xs:documentation>
-                        what MIME type this reply is
+                        what MIME type this reply is.  Default is set by the binding.
                         </xs:documentation>
                     </xs:annotation>      
                 </xs:attribute>
@@ -204,6 +204,14 @@
                         </xs:documentation>
                     </xs:annotation>      
                 </xs:attribute>
+                <xs:attribute name="ChunkingThreshold" type="xs:int" use="optional" default="4096">
+                    <xs:annotation>
+                        <xs:documentation>
+                        If AllowChunking is true, this sets the threshold at which mesages start
+                        getting chunked.   Messages under this limit do not get chunked.
+                        </xs:documentation>
+                    </xs:annotation>      
+                </xs:attribute>
 
                 <xs:attribute name="Accept" type="xs:string" use="optional">
                     <xs:annotation>
@@ -229,7 +237,7 @@
                     </xs:annotation>      
                 </xs:attribute>
 
-                <xs:attribute name="ContentType" type="xs:string" use="optional" default="text/xml">
+                <xs:attribute name="ContentType" type="xs:string" use="optional">
                     <xs:annotation>
                         <xs:documentation>
                         The content type of the stream being sent in a post request 

Modified: cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java (original)
+++ cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java Fri Jul 18 20:39:37 2008
@@ -36,6 +36,7 @@
 import org.apache.cxf.bus.CXFBusImpl;
 import org.apache.cxf.configuration.security.AuthorizationPolicy;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.AbstractThresholdOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
@@ -408,6 +409,7 @@
             conduit.getClient().setAutoRedirect(autoRedirect);
             if (!autoRedirect) {
                 conduit.getClient().setAllowChunking(true);
+                conduit.getClient().setChunkingThreshold(0);
             }
         }
 
@@ -561,10 +563,12 @@
         
         control.replay();
         
-        OutputStream wrappedOS = message.getContent(OutputStream.class);
+        AbstractThresholdOutputStream wrappedOS 
+            = (AbstractThresholdOutputStream) message.getContent(OutputStream.class);
         assertNotNull("expected output stream", wrappedOS);
         
         wrappedOS.write(PAYLOAD.getBytes());
+        wrappedOS.unBuffer();
         
         control.verify();
         control.reset();