You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/24 15:24:53 UTC
svn commit: r1389357 - in
/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient:
AsyncHTTPConduit.java SharedOutputBuffer.java
Author: dkulp
Date: Mon Sep 24 13:24:53 2012
New Revision: 1389357
URL: http://svn.apache.org/viewvc?rev=1389357&view=rev
Log:
Implement WritableByteChannel which will allow some extra optimizations
Modified:
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1389357&r1=1389356&r2=1389357&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Mon Sep 24 13:24:53 2012
@@ -30,6 +30,8 @@ import java.net.Proxy;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.security.GeneralSecurityException;
import java.security.Principal;
import java.security.cert.Certificate;
@@ -186,7 +188,8 @@ public class AsyncHTTPConduit extends UR
}
- public class AsyncWrappedOutputStream extends WrappedOutputStream implements CopyingOutputStream {
+ public class AsyncWrappedOutputStream extends WrappedOutputStream
+ implements CopyingOutputStream, WritableByteChannel {
final HTTPClientPolicy csPolicy;
CXFHttpRequest entity;
@@ -264,7 +267,33 @@ public class AsyncHTTPConduit extends UR
outbuf.writeCompleted();
}
-
+ public boolean isOpen() {
+ return true;
+ }
+
+ public int write(ByteBuffer src) throws IOException {
+ int total = 0;
+ if (buffer != null) {
+ int pos = buffer.size();
+ int len = this.threshold - pos;
+ if (len > src.remaining()) {
+ len = src.remaining();
+ }
+ src.get(buffer.getRawBytes(), pos, len);
+ buffer.setSize(buffer.size() + len);
+ total += len;
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ }
+ if (cachingForRetransmission) {
+ wrappedStream.write(src.array(), src.position(), src.remaining());
+ return src.remaining() + total;
+ }
+ return outbuf.write(src) + total;
+ }
+
public int copyFrom(InputStream in) throws IOException {
int count = 0;
if (buffer != null) {
@@ -638,6 +667,7 @@ public class AsyncHTTPConduit extends UR
sessionLock.notifyAll();
}
}
+
}
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java?rev=1389357&r1=1389356&r2=1389357&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedOutputBuffer.java Mon Sep 24 13:24:53 2012
@@ -255,6 +255,33 @@ public class SharedOutputBuffer extends
this.lock.unlock();
}
}
+
+ public int write(ByteBuffer b) throws IOException {
+ if (b == null) {
+ return 0;
+ }
+ this.lock.lock();
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+
+ if (!this.buffer.hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ int c = b.limit() - b.position();
+ largeWrapper = b;
+ while (largeWrapper.hasRemaining()) {
+ flushContent();
+ }
+ largeWrapper = null;
+ return c;
+ } finally {
+ this.lock.unlock();
+ }
+ }
public void write(final byte[] b) throws IOException {
if (b == null) {
@@ -319,5 +346,6 @@ public class SharedOutputBuffer extends
}
}
+
}