You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/11 17:44:11 UTC

svn commit: r1383455 - /cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java

Author: dkulp
Date: Tue Sep 11 15:44:11 2012
New Revision: 1383455

URL: http://svn.apache.org/viewvc?rev=1383455&view=rev
Log:
Some optimizations in the SharedInputBuffer to allow reduce the number of times the main CXF worker threads needs to wait for content as well as remove buffer copies if they do need to wait.

Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1383455&r1=1383454&r2=1383455&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java Tue Sep 11 15:44:11 2012
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.http.as
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -48,16 +49,20 @@ public class SharedInputBuffer extends E
 
     private final ReentrantLock lock;
     private final Condition condition;
+    private final int requestInputSize;
 
     private volatile IOControl ioctrl;
     private volatile boolean shutdown;
     private volatile boolean endOfStream;
-
+    
+    private volatile ByteBuffer waitingBuffer;
+    
     public SharedInputBuffer(int buffersize, 
                              final ByteBufferAllocator allocator) {
         super(buffersize, allocator);
         this.lock = new ReentrantLock();
         this.condition = this.lock.newCondition();
+        this.requestInputSize = buffersize * 4 / 3;  
     }
 
     public void reset() {
@@ -87,6 +92,11 @@ public class SharedInputBuffer extends E
             setInputMode();
             int totalRead = 0;
             int bytesRead;
+            if (waitingBuffer != null && this.buffer.position() == 0) {
+                while ((bytesRead = decoder.read(this.waitingBuffer)) > 0) {
+                    totalRead += bytesRead;
+                }
+            }
             while ((bytesRead = decoder.read(this.buffer)) > 0) {
                 totalRead += bytesRead;
             }
@@ -156,7 +166,8 @@ public class SharedInputBuffer extends E
         this.lock.lock();
         try {
             try {
-                while (!super.hasData() && !this.endOfStream) {
+                while ((this.waitingBuffer != null && this.waitingBuffer.position() == 0) 
+                    && !super.hasData() && !this.endOfStream) {
                     if (this.shutdown) {
                         throw new InterruptedIOException("Input operation aborted");
                     }
@@ -235,7 +246,13 @@ public class SharedInputBuffer extends E
         this.lock.lock();
         try {
             if (!hasData()) {
+                this.waitingBuffer = ByteBuffer.wrap(b, off, len);
                 waitForData();
+                int i = waitingBuffer.position();
+                waitingBuffer = null;
+                if (i > 0) {
+                    return i;
+                }
             }
             if (isEndOfStream()) {
                 return -1;
@@ -246,6 +263,12 @@ public class SharedInputBuffer extends E
                 chunk = this.buffer.remaining();
             }
             this.buffer.get(b, off, chunk);
+            if (this.buffer.remaining() < this.requestInputSize && !this.endOfStream && this.ioctrl != null) {
+                //we have a significant amount of space empty in the buffer, we'll turn on 
+                //the input so maybe we'll get another chunk by the time the next read happens
+                //and we can then avoid waiting for input
+                this.ioctrl.requestInput();
+            }
             return chunk;
         } finally {
             this.lock.unlock();