You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2010/06/16 18:20:44 UTC

svn commit: r955288 - in /httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util: SharedInputBuffer.java SharedOutputBuffer.java

Author: olegk
Date: Wed Jun 16 16:20:44 2010
New Revision: 955288

URL: http://svn.apache.org/viewvc?rev=955288&view=rev
Log:
Shared I/O buffers to use a more flexible ReentrantLock for synchronization; clarified API contract in javadocs

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java?rev=955288&r1=955287&r2=955288&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java Wed Jun 16 16:20:44 2010
@@ -28,6 +28,8 @@ package org.apache.http.nio.util;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.nio.ContentDecoder;
 import org.apache.http.nio.IOControl;
@@ -37,15 +39,22 @@ import org.apache.http.nio.IOControl;
  * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
  * a worker thread.
  * <p>
- * Please note this class is thread safe only when used though
- * the {@link ContentInputBuffer} interface.
+ * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer 
+ *   by calling {@link #consumeContent(ContentDecoder)}.
+ * <p>
+ * The worker thread is expected to read the data from the buffer by calling 
+ *   {@link #read()} or {@link #read(byte[], int, int)} methods. 
+ * <p>
+ * In case of an abnormal situation or when no longer needed the buffer must be shut down
+ * using {@link #shutdown()} method. 
  *
  * @since 4.0
  */
 public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
 
     private final IOControl ioctrl;
-    private final Object mutex;
+    private final ReentrantLock lock;
+    private final Condition condition;
 
     private volatile boolean shutdown = false;
     private volatile boolean endOfStream = false;
@@ -56,16 +65,20 @@ public class SharedInputBuffer extends E
             throw new IllegalArgumentException("I/O content control may not be null");
         }
         this.ioctrl = ioctrl;
-        this.mutex = new Object();
+        this.lock = new ReentrantLock();
+        this.condition = this.lock.newCondition();
     }
 
     public void reset() {
         if (this.shutdown) {
             return;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             clear();
             this.endOfStream = false;
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -73,7 +86,8 @@ public class SharedInputBuffer extends E
         if (this.shutdown) {
             return -1;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             setInputMode();
             int totalRead = 0;
             int bytesRead;
@@ -86,7 +100,7 @@ public class SharedInputBuffer extends E
             if (!this.buffer.hasRemaining()) {
                 this.ioctrl.suspendInput();
             }
-            this.mutex.notifyAll();
+            this.condition.signalAll();
 
             if (totalRead > 0) {
                 return totalRead;
@@ -97,43 +111,67 @@ public class SharedInputBuffer extends E
                     return 0;
                 }
             }
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean hasData() {
+        this.lock.lock();
+        try {
+            return super.hasData();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int available() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.available();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int capacity() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.capacity();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int length() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.length();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     protected void waitForData() throws IOException {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             try {
-                while (!hasData() && !this.endOfStream) {
+                while (!super.hasData() && !this.endOfStream) {
                     if (this.shutdown) {
                         throw new InterruptedIOException("Input operation aborted");
                     }
                     this.ioctrl.requestInput();
-                    this.mutex.wait();
+                    this.condition.await();
                 }
             } catch (InterruptedException ex) {
                 throw new IOException("Interrupted while waiting for more data");
             }
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -142,8 +180,11 @@ public class SharedInputBuffer extends E
             return;
         }
         this.endOfStream = true;
-        synchronized (this.mutex) {
-            this.mutex.notifyAll();
+        this.lock.lock();
+        try {
+            this.condition.signalAll();
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -152,8 +193,11 @@ public class SharedInputBuffer extends E
             return;
         }
         this.shutdown = true;
-        synchronized (this.mutex) {
-            this.mutex.notifyAll();
+        this.lock.lock();
+        try {
+            this.condition.signalAll();
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -169,7 +213,8 @@ public class SharedInputBuffer extends E
         if (this.shutdown) {
             return -1;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             if (!hasData()) {
                 waitForData();
             }
@@ -177,6 +222,8 @@ public class SharedInputBuffer extends E
                 return -1;
             }
             return this.buffer.get() & 0xff;
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -187,7 +234,8 @@ public class SharedInputBuffer extends E
         if (b == null) {
             return 0;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             if (!hasData()) {
                 waitForData();
             }
@@ -201,6 +249,8 @@ public class SharedInputBuffer extends E
             }
             this.buffer.get(b, off, chunk);
             return chunk;
+        } finally {
+            this.lock.unlock();
         }
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java?rev=955288&r1=955287&r2=955288&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java Wed Jun 16 16:20:44 2010
@@ -28,6 +28,8 @@ package org.apache.http.nio.util;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.IOControl;
@@ -37,15 +39,22 @@ import org.apache.http.nio.IOControl;
  * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
  * a worker thread.
  * <p>
- * Please note this class is thread safe only when used though
- * the {@link ContentOutputBuffer} interface.
+ * The I/O dispatch thread is expected to transfer data from the buffer to 
+ *   {@link ContentEncoder} by calling {@link #produceContent(ContentEncoder)}.
+ * <p>
+ * The worker thread is expected to write data to the buffer by calling 
+ * {@link #write(int)}, {@link #write(byte[], int, int)} or {@link #writeCompleted()}
+ * <p>
+ * In case of an abnormal situation or when no longer needed the buffer must be 
+ * shut down using {@link #shutdown()} method. 
  *
  * @since 4.0
  */
 public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
 
     private final IOControl ioctrl;
-    private final Object mutex;
+    private final ReentrantLock lock;
+    private final Condition condition;
 
     private volatile boolean shutdown = false;
     private volatile boolean endOfStream = false;
@@ -56,37 +65,60 @@ public class SharedOutputBuffer extends 
             throw new IllegalArgumentException("I/O content control may not be null");
         }
         this.ioctrl = ioctrl;
-        this.mutex = new Object();
+        this.lock = new ReentrantLock();
+        this.condition = this.lock.newCondition();
     }
 
     public void reset() {
         if (this.shutdown) {
             return;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             clear();
             this.endOfStream = false;
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean hasData() {
+        this.lock.lock();
+        try {
+            return super.hasData();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int available() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.available();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int capacity() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.capacity();
+        } finally {
+            this.lock.unlock();
         }
     }
 
     @Override
     public int length() {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             return super.length();
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -94,16 +126,17 @@ public class SharedOutputBuffer extends 
         if (this.shutdown) {
             return -1;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             setOutputMode();
             int bytesWritten = 0;
-            if (hasData()) {
+            if (super.hasData()) {
                 bytesWritten = encoder.write(this.buffer);
                 if (encoder.isCompleted()) {
                     this.endOfStream = true;
                 }
             }
-            if (!hasData()) {
+            if (!super.hasData()) {
                 // No more buffered content
                 // If at the end of the stream, terminate
                 if (this.endOfStream && !encoder.isCompleted()) {
@@ -114,8 +147,10 @@ public class SharedOutputBuffer extends 
                     this.ioctrl.suspendOutput();
                 }
             }
-            this.mutex.notifyAll();
+            this.condition.signalAll();
             return bytesWritten;
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -128,8 +163,11 @@ public class SharedOutputBuffer extends 
             return;
         }
         this.shutdown = true;
-        synchronized (this.mutex) {
-            this.mutex.notifyAll();
+        this.lock.lock();
+        try {
+            this.condition.signalAll();
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -137,7 +175,8 @@ public class SharedOutputBuffer extends 
         if (b == null) {
             return;
         }
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             if (this.shutdown || this.endOfStream) {
                 throw new IllegalStateException("Buffer already closed for writing");
             }
@@ -153,6 +192,8 @@ public class SharedOutputBuffer extends 
                 remaining -= chunk;
                 off += chunk;
             }
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -164,7 +205,8 @@ public class SharedOutputBuffer extends 
     }
 
     public void write(int b) throws IOException {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             if (this.shutdown || this.endOfStream) {
                 throw new IllegalStateException("Buffer already closed for writing");
             }
@@ -174,6 +216,8 @@ public class SharedOutputBuffer extends 
                 setInputMode();
             }
             this.buffer.put((byte)b);
+        } finally {
+            this.lock.unlock();
         }
     }
 
@@ -181,28 +225,34 @@ public class SharedOutputBuffer extends 
     }
 
     private void flushContent() throws IOException {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             try {
-                while (hasData()) {
+                while (super.hasData()) {
                     if (this.shutdown) {
                         throw new InterruptedIOException("Output operation aborted");
                     }
                     this.ioctrl.requestOutput();
-                    this.mutex.wait();
+                    this.condition.await();
                 }
             } catch (InterruptedException ex) {
                 throw new IOException("Interrupted while flushing the content buffer");
             }
+        } finally {
+            this.lock.unlock();
         }
     }
 
     public void writeCompleted() throws IOException {
-        synchronized (this.mutex) {
+        this.lock.lock();
+        try {
             if (this.endOfStream) {
                 return;
             }
             this.endOfStream = true;
             this.ioctrl.requestOutput();
+        } finally {
+            this.lock.unlock();
         }
     }