You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/20 23:10:01 UTC

svn commit: r1388221 - in /cxf/trunk: api/src/main/java/org/apache/cxf/helpers/ api/src/main/java/org/apache/cxf/io/ rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/ rt/transports/http-jetty/src/main/java/org/apache/cxf/tr...

Author: dkulp
Date: Thu Sep 20 21:10:01 2012
New Revision: 1388221

URL: http://svn.apache.org/viewvc?rev=1388221&view=rev
Log:
Bunch of optimizations to allow the stream copying to avoid a bunch of buffer copying.

Added:
    cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java
Modified:
    cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java
    cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java Thu Sep 20 21:10:01 2012
@@ -30,9 +30,11 @@ import java.io.UnsupportedEncodingExcept
 import java.io.Writer;
 import java.nio.charset.Charset;
 
+import org.apache.cxf.io.CopyingOutputStream;
+
 public final class IOUtils {
     public static final Charset UTF8_CHARSET = Charset.forName("utf-8");
-    private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+    public static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
 
     private IOUtils() {
 
@@ -100,13 +102,16 @@ public final class IOUtils {
 
     public static int copy(final InputStream input, final OutputStream output)
         throws IOException {
+        if (output instanceof CopyingOutputStream) {
+            return ((CopyingOutputStream)output).copyFrom(input);
+        }
         return copy(input, output, DEFAULT_BUFFER_SIZE);
     }
 
     public static int copyAndCloseInput(final InputStream input,
             final OutputStream output) throws IOException {
         try {
-            return copy(input, output, DEFAULT_BUFFER_SIZE);
+            return copy(input, output);
         } finally {
             input.close();
         }

Modified: cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java Thu Sep 20 21:10:01 2012
@@ -51,6 +51,10 @@ public class LoadingByteArrayOutputStrea
         };
     }
     
+    public void setSize(int i) {
+        count = i;
+    }
+    
     public byte[] toByteArray() {
         if (count != buf.length) {
             buf = super.toByteArray();

Added: cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java?rev=1388221&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java Thu Sep 20 21:10:01 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Marker interface for OutputStreams that can directly support 
+ * copying from an input stream.  OutputStreams that maintain their
+ * own byte buffer or similar may be able to optimize the copy
+ * instead of using the read/write into a temporary buffer that
+ * the normal IOUtils.copy method requires.
+ */
+public interface CopyingOutputStream {
+
+    int copyFrom(InputStream in) throws IOException;
+    
+}

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Thu Sep 20 21:10:01 2012
@@ -51,7 +51,9 @@ import org.apache.cxf.common.util.String
 import org.apache.cxf.configuration.jsse.SSLUtils;
 import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.service.model.EndpointInfo;
@@ -184,7 +186,7 @@ public class AsyncHTTPConduit extends UR
     }
     
     
-    public class AsyncWrappedOutputStream extends WrappedOutputStream {
+    public class AsyncWrappedOutputStream extends WrappedOutputStream implements CopyingOutputStream {
         final HTTPClientPolicy csPolicy;
 
         CXFHttpRequest entity;
@@ -261,6 +263,39 @@ public class AsyncHTTPConduit extends UR
             connect(false);
             outbuf.writeCompleted();
         }
+        
+        
+        public int copyFrom(InputStream in) throws IOException {
+            int count = 0;
+            if (buffer != null) {
+                while (buffer != null) {
+                    int pos = buffer.size();
+                    int i = in.read(buffer.getRawBytes(), pos,
+                                    this.threshold - pos);
+                    if (i > 0) {
+                        buffer.setSize(pos + i);
+                        if (buffer.size() >= threshold) {
+                            thresholdReached();
+                            unBuffer();
+                        }
+                        count += i;
+                    } else {
+                        return count;
+                    }
+                }
+            }
+            if (cachingForRetransmission) {
+                count += IOUtils.copy(in, wrappedStream);
+            } else {
+                count += outbuf.copy(in);
+            }
+            return count;
+        }
+        
+        @Override
+        public void close() throws IOException {
+            super.close();
+        }
         protected void setupWrappedStream() throws IOException {
             connect(true);
             wrappedStream = new OutputStream() {

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java Thu Sep 20 21:10:01 2012
@@ -20,6 +20,7 @@
 package org.apache.cxf.transport.http.asyncclient;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.locks.Condition;
@@ -171,6 +172,54 @@ public class SharedOutputBuffer extends 
             this.lock.unlock();
         }
     }
+    public int copy(InputStream in) throws IOException {
+        this.lock.lock();
+        int total = 0;
+        try {
+            if (this.shutdown || this.endOfStream) {
+                throw new IllegalStateException("Buffer already closed for writing");
+            }
+            setInputMode();
+            int i = 0;
+            boolean yielded = false;
+            while (i != -1) {
+                if (!this.buffer.hasRemaining()) {
+                    flushContent();
+                    setInputMode();
+                }
+                i = in.available();
+                if (i == 0 && !yielded) {
+                    //nothing avail right now, we'll attempt an
+                    //output, but not really force a flush.
+                    if (buffer.position() != 0 && this.ioctrl != null) {
+                        this.ioctrl.requestOutput();
+                    }
+                    try {
+                        condition.awaitNanos(1);
+                    } catch (InterruptedException e) {
+                        //ignore
+                    }
+                    setInputMode();
+                    yielded = true;
+                } else {
+                    int p = this.buffer.position();
+                    i = in.read(this.buffer.array(), this.buffer.position(), this.buffer.remaining());
+                    yielded = false;
+                    if (i != -1) {
+                        total += i;
+                        buffer.position(p + i);
+                    }
+                    /*
+                    System.out.println("p: " + p + "  " + i + " " + this.buffer.position() 
+                                       + " " + this.buffer.hasRemaining());
+                                       */
+                }
+            }
+        } finally {
+            this.lock.unlock();
+        }
+        return total;
+    }
 
     public void write(final byte[] b, int off, int len) throws IOException {
         if (b == null) {
@@ -270,4 +319,5 @@ public class SharedOutputBuffer extends 
         }
     }
 
+
 }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Thu Sep 20 21:10:01 2012
@@ -18,7 +18,11 @@
  */
 package org.apache.cxf.transport.http_jetty;
 
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.security.GeneralSecurityException;
@@ -41,6 +45,7 @@ import org.apache.cxf.continuations.Cont
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
@@ -55,6 +60,7 @@ import org.apache.cxf.transports.http.Qu
 import org.apache.cxf.transports.http.StemMatchingQueryHandler;
 import org.eclipse.jetty.http.Generator;
 import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.server.AbstractHttpConnection.Output;
 import org.eclipse.jetty.server.Request;
 import org.springframework.util.ClassUtils;
 
@@ -368,6 +374,63 @@ public class JettyHTTPDestination extend
             }
         }
     }
+    
+    protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
+        OutputStream out = super.flushHeaders(outMessage, getStream);
+        if (out instanceof Output) {
+            out = new JettyOutputStream((Output)out);
+        }
+        return out;
+    }
+    static class JettyOutputStream extends FilterOutputStream implements CopyingOutputStream {
+        final Output out;
+        public JettyOutputStream(Output o) {
+            super(o);
+            out = o;
+        }
+
+        @Override
+        public int copyFrom(InputStream in) throws IOException {
+            CountingInputStream c = new CountingInputStream(in);
+            out.sendContent(c);
+            return c.getCount();
+        }
+    }
+    static class CountingInputStream extends FilterInputStream {
+        int count;
+        public CountingInputStream(InputStream in) {
+            super(in);
+        }
+        public int getCount() {
+            return count;
+        }
+        
+        @Override
+        public int read() throws IOException {
+            int i = super.read();
+            if (i != -1) {
+                ++count;
+            }
+            return i;
+        }
+        @Override
+        public int read(byte[] b) throws IOException {
+            int i = super.read(b);
+            if (i != -1) {
+                count += i;
+            }
+            return i;
+        }
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            int i = super.read(b, off, len);
+            if (i != -1) {
+                count += i;
+            }
+            return i;
+        }
+    }
+    
  
     public ServerEngine getEngine() {
         return engine;

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java Thu Sep 20 21:10:01 2012
@@ -50,9 +50,11 @@ import org.apache.cxf.configuration.secu
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.io.AbstractWrappedOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
 import org.apache.cxf.io.DelegatingInputStream;
 import org.apache.cxf.message.Attachment;
 import org.apache.cxf.message.Exchange;
@@ -647,7 +649,7 @@ public abstract class AbstractHTTPDestin
      * Wrapper stream responsible for flushing headers and committing outgoing
      * HTTP-level response.
      */
-    private class WrappedOutputStream extends AbstractWrappedOutputStream {
+    private class WrappedOutputStream extends AbstractWrappedOutputStream implements CopyingOutputStream {
 
         protected HttpServletResponse response;
         private Message outMessage;
@@ -658,6 +660,19 @@ public abstract class AbstractHTTPDestin
             response = resp;
         }
 
+        
+        @Override
+        public int copyFrom(InputStream in) throws IOException {
+            if (!written) {
+                onFirstWrite();
+                written = true;
+            }
+            if (wrappedStream != null) {
+                return IOUtils.copy(in, wrappedStream);
+            }
+            return IOUtils.copy(in, this, IOUtils.DEFAULT_BUFFER_SIZE);
+        }
+        
         /**
          * Perform any actions required on stream flush (freeze headers,
          * reset output stream ... etc.)
@@ -699,6 +714,7 @@ public abstract class AbstractHTTPDestin
             }
             */
         }
+
     }
 
     protected boolean contextMatchOnExact() {