You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/20 23:10:01 UTC
svn commit: r1388221 - in /cxf/trunk:
api/src/main/java/org/apache/cxf/helpers/
api/src/main/java/org/apache/cxf/io/
rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
rt/transports/http-jetty/src/main/java/org/apache/cxf/tr...
Author: dkulp
Date: Thu Sep 20 21:10:01 2012
New Revision: 1388221
URL: http://svn.apache.org/viewvc?rev=1388221&view=rev
Log:
Bunch of optimizations to allow the stream copying to avoid a bunch of buffer copying.
Added:
cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java
cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/helpers/IOUtils.java Thu Sep 20 21:10:01 2012
@@ -30,9 +30,11 @@ import java.io.UnsupportedEncodingExcept
import java.io.Writer;
import java.nio.charset.Charset;
+import org.apache.cxf.io.CopyingOutputStream;
+
public final class IOUtils {
public static final Charset UTF8_CHARSET = Charset.forName("utf-8");
- private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+ public static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
private IOUtils() {
@@ -100,13 +102,16 @@ public final class IOUtils {
public static int copy(final InputStream input, final OutputStream output)
throws IOException {
+ if (output instanceof CopyingOutputStream) {
+ return ((CopyingOutputStream)output).copyFrom(input);
+ }
return copy(input, output, DEFAULT_BUFFER_SIZE);
}
public static int copyAndCloseInput(final InputStream input,
final OutputStream output) throws IOException {
try {
- return copy(input, output, DEFAULT_BUFFER_SIZE);
+ return copy(input, output);
} finally {
input.close();
}
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java Thu Sep 20 21:10:01 2012
@@ -51,6 +51,10 @@ public class LoadingByteArrayOutputStrea
};
}
+ public void setSize(int i) {
+ count = i;
+ }
+
public byte[] toByteArray() {
if (count != buf.length) {
buf = super.toByteArray();
Added: cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java?rev=1388221&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/CopyingOutputStream.java Thu Sep 20 21:10:01 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Marker interface for OutputStreams that can directly support
+ * copying from an input stream. OutputStreams that maintain their
+ * own byte buffer or similar may be able to optimize the copy
+ * instead of using the read/write into a temporary buffer that
+ * the normal IOUtils.copy method requires.
+ */
+public interface CopyingOutputStream {
+
+ int copyFrom(InputStream in) throws IOException;
+
+}
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Thu Sep 20 21:10:01 2012
@@ -51,7 +51,9 @@ import org.apache.cxf.common.util.String
import org.apache.cxf.configuration.jsse.SSLUtils;
import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
@@ -184,7 +186,7 @@ public class AsyncHTTPConduit extends UR
}
- public class AsyncWrappedOutputStream extends WrappedOutputStream {
+ public class AsyncWrappedOutputStream extends WrappedOutputStream implements CopyingOutputStream {
final HTTPClientPolicy csPolicy;
CXFHttpRequest entity;
@@ -261,6 +263,39 @@ public class AsyncHTTPConduit extends UR
connect(false);
outbuf.writeCompleted();
}
+
+
+ public int copyFrom(InputStream in) throws IOException {
+ int count = 0;
+ if (buffer != null) {
+ while (buffer != null) {
+ int pos = buffer.size();
+ int i = in.read(buffer.getRawBytes(), pos,
+ this.threshold - pos);
+ if (i > 0) {
+ buffer.setSize(pos + i);
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ count += i;
+ } else {
+ return count;
+ }
+ }
+ }
+ if (cachingForRetransmission) {
+ count += IOUtils.copy(in, wrappedStream);
+ } else {
+ count += outbuf.copy(in);
+ }
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
protected void setupWrappedStream() throws IOException {
connect(true);
wrappedStream = new OutputStream() {
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java Thu Sep 20 21:10:01 2012
@@ -20,6 +20,7 @@
package org.apache.cxf.transport.http.asyncclient;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Condition;
@@ -171,6 +172,54 @@ public class SharedOutputBuffer extends
this.lock.unlock();
}
}
+ public int copy(InputStream in) throws IOException {
+ this.lock.lock();
+ int total = 0;
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+ int i = 0;
+ boolean yielded = false;
+ while (i != -1) {
+ if (!this.buffer.hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ i = in.available();
+ if (i == 0 && !yielded) {
+ //nothing avail right now, we'll attempt an
+ //output, but not really force a flush.
+ if (buffer.position() != 0 && this.ioctrl != null) {
+ this.ioctrl.requestOutput();
+ }
+ try {
+ condition.awaitNanos(1);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ setInputMode();
+ yielded = true;
+ } else {
+ int p = this.buffer.position();
+ i = in.read(this.buffer.array(), this.buffer.position(), this.buffer.remaining());
+ yielded = false;
+ if (i != -1) {
+ total += i;
+ buffer.position(p + i);
+ }
+ /*
+ System.out.println("p: " + p + " " + i + " " + this.buffer.position()
+ + " " + this.buffer.hasRemaining());
+ */
+ }
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ return total;
+ }
public void write(final byte[] b, int off, int len) throws IOException {
if (b == null) {
@@ -270,4 +319,5 @@ public class SharedOutputBuffer extends
}
}
+
}
Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Thu Sep 20 21:10:01 2012
@@ -18,7 +18,11 @@
*/
package org.apache.cxf.transport.http_jetty;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
@@ -41,6 +45,7 @@ import org.apache.cxf.continuations.Cont
import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
@@ -55,6 +60,7 @@ import org.apache.cxf.transports.http.Qu
import org.apache.cxf.transports.http.StemMatchingQueryHandler;
import org.eclipse.jetty.http.Generator;
import org.eclipse.jetty.io.AbstractConnection;
+import org.eclipse.jetty.server.AbstractHttpConnection.Output;
import org.eclipse.jetty.server.Request;
import org.springframework.util.ClassUtils;
@@ -368,6 +374,63 @@ public class JettyHTTPDestination extend
}
}
}
+
+ protected OutputStream flushHeaders(Message outMessage, boolean getStream) throws IOException {
+ OutputStream out = super.flushHeaders(outMessage, getStream);
+ if (out instanceof Output) {
+ out = new JettyOutputStream((Output)out);
+ }
+ return out;
+ }
+ static class JettyOutputStream extends FilterOutputStream implements CopyingOutputStream {
+ final Output out;
+ public JettyOutputStream(Output o) {
+ super(o);
+ out = o;
+ }
+
+ @Override
+ public int copyFrom(InputStream in) throws IOException {
+ CountingInputStream c = new CountingInputStream(in);
+ out.sendContent(c);
+ return c.getCount();
+ }
+ }
+ static class CountingInputStream extends FilterInputStream {
+ int count;
+ public CountingInputStream(InputStream in) {
+ super(in);
+ }
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int i = super.read();
+ if (i != -1) {
+ ++count;
+ }
+ return i;
+ }
+ @Override
+ public int read(byte[] b) throws IOException {
+ int i = super.read(b);
+ if (i != -1) {
+ count += i;
+ }
+ return i;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int i = super.read(b, off, len);
+ if (i != -1) {
+ count += i;
+ }
+ return i;
+ }
+ }
+
public ServerEngine getEngine() {
return engine;
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java?rev=1388221&r1=1388220&r2=1388221&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java Thu Sep 20 21:10:01 2012
@@ -50,9 +50,11 @@ import org.apache.cxf.configuration.secu
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.io.AbstractWrappedOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
import org.apache.cxf.io.DelegatingInputStream;
import org.apache.cxf.message.Attachment;
import org.apache.cxf.message.Exchange;
@@ -647,7 +649,7 @@ public abstract class AbstractHTTPDestin
* Wrapper stream responsible for flushing headers and committing outgoing
* HTTP-level response.
*/
- private class WrappedOutputStream extends AbstractWrappedOutputStream {
+ private class WrappedOutputStream extends AbstractWrappedOutputStream implements CopyingOutputStream {
protected HttpServletResponse response;
private Message outMessage;
@@ -658,6 +660,19 @@ public abstract class AbstractHTTPDestin
response = resp;
}
+
+ @Override
+ public int copyFrom(InputStream in) throws IOException {
+ if (!written) {
+ onFirstWrite();
+ written = true;
+ }
+ if (wrappedStream != null) {
+ return IOUtils.copy(in, wrappedStream);
+ }
+ return IOUtils.copy(in, this, IOUtils.DEFAULT_BUFFER_SIZE);
+ }
+
/**
* Perform any actions required on stream flush (freeze headers,
* reset output stream ... etc.)
@@ -699,6 +714,7 @@ public abstract class AbstractHTTPDestin
}
*/
}
+
}
protected boolean contextMatchOnExact() {