You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2010/06/16 18:20:44 UTC
svn commit: r955288 - in
/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util:
SharedInputBuffer.java SharedOutputBuffer.java
Author: olegk
Date: Wed Jun 16 16:20:44 2010
New Revision: 955288
URL: http://svn.apache.org/viewvc?rev=955288&view=rev
Log:
Shared I/O buffers to use a more flexible ReentrantLock for synchronization; clarified API contract in javadocs
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java?rev=955288&r1=955287&r2=955288&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedInputBuffer.java Wed Jun 16 16:20:44 2010
@@ -28,6 +28,8 @@ package org.apache.http.nio.util;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
@@ -37,15 +39,22 @@ import org.apache.http.nio.IOControl;
* shared by multiple threads, usually the I/O dispatch of an I/O reactor and
* a worker thread.
* <p>
- * Please note this class is thread safe only when used though
- * the {@link ContentInputBuffer} interface.
+ * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
+ * by calling {@link #consumeContent(ContentDecoder)}.
+ * <p>
+ * The worker thread is expected to read the data from the buffer by calling
+ * {@link #read()} or {@link #read(byte[], int, int)} methods.
+ * <p>
+ * In case of an abnormal situation or when no longer needed the buffer must be shut down
+ * using {@link #shutdown()} method.
*
* @since 4.0
*/
public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
private final IOControl ioctrl;
- private final Object mutex;
+ private final ReentrantLock lock;
+ private final Condition condition;
private volatile boolean shutdown = false;
private volatile boolean endOfStream = false;
@@ -56,16 +65,20 @@ public class SharedInputBuffer extends E
throw new IllegalArgumentException("I/O content control may not be null");
}
this.ioctrl = ioctrl;
- this.mutex = new Object();
+ this.lock = new ReentrantLock();
+ this.condition = this.lock.newCondition();
}
public void reset() {
if (this.shutdown) {
return;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
clear();
this.endOfStream = false;
+ } finally {
+ this.lock.unlock();
}
}
@@ -73,7 +86,8 @@ public class SharedInputBuffer extends E
if (this.shutdown) {
return -1;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
setInputMode();
int totalRead = 0;
int bytesRead;
@@ -86,7 +100,7 @@ public class SharedInputBuffer extends E
if (!this.buffer.hasRemaining()) {
this.ioctrl.suspendInput();
}
- this.mutex.notifyAll();
+ this.condition.signalAll();
if (totalRead > 0) {
return totalRead;
@@ -97,43 +111,67 @@ public class SharedInputBuffer extends E
return 0;
}
}
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasData() {
+ this.lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int available() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.available();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int capacity() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.capacity();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int length() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.length();
+ } finally {
+ this.lock.unlock();
}
}
protected void waitForData() throws IOException {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
try {
- while (!hasData() && !this.endOfStream) {
+ while (!super.hasData() && !this.endOfStream) {
if (this.shutdown) {
throw new InterruptedIOException("Input operation aborted");
}
this.ioctrl.requestInput();
- this.mutex.wait();
+ this.condition.await();
}
} catch (InterruptedException ex) {
throw new IOException("Interrupted while waiting for more data");
}
+ } finally {
+ this.lock.unlock();
}
}
@@ -142,8 +180,11 @@ public class SharedInputBuffer extends E
return;
}
this.endOfStream = true;
- synchronized (this.mutex) {
- this.mutex.notifyAll();
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
}
}
@@ -152,8 +193,11 @@ public class SharedInputBuffer extends E
return;
}
this.shutdown = true;
- synchronized (this.mutex) {
- this.mutex.notifyAll();
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
}
}
@@ -169,7 +213,8 @@ public class SharedInputBuffer extends E
if (this.shutdown) {
return -1;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
if (!hasData()) {
waitForData();
}
@@ -177,6 +222,8 @@ public class SharedInputBuffer extends E
return -1;
}
return this.buffer.get() & 0xff;
+ } finally {
+ this.lock.unlock();
}
}
@@ -187,7 +234,8 @@ public class SharedInputBuffer extends E
if (b == null) {
return 0;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
if (!hasData()) {
waitForData();
}
@@ -201,6 +249,8 @@ public class SharedInputBuffer extends E
}
this.buffer.get(b, off, chunk);
return chunk;
+ } finally {
+ this.lock.unlock();
}
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java?rev=955288&r1=955287&r2=955288&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java Wed Jun 16 16:20:44 2010
@@ -28,6 +28,8 @@ package org.apache.http.nio.util;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
@@ -37,15 +39,22 @@ import org.apache.http.nio.IOControl;
* shared by multiple threads, usually the I/O dispatch of an I/O reactor and
* a worker thread.
* <p>
- * Please note this class is thread safe only when used though
- * the {@link ContentOutputBuffer} interface.
+ * The I/O dispatch thread is expected to transfer data from the buffer to
+ * {@link ContentEncoder} by calling {@link #produceContent(ContentEncoder)}.
+ * <p>
+ * The worker thread is expected to write data to the buffer by calling
+ * {@link #write(int)}, {@link #write(byte[], int, int)} or {@link #writeCompleted()}
+ * <p>
+ * In case of an abnormal situation or when no longer needed the buffer must be
+ * shut down using {@link #shutdown()} method.
*
* @since 4.0
*/
public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
private final IOControl ioctrl;
- private final Object mutex;
+ private final ReentrantLock lock;
+ private final Condition condition;
private volatile boolean shutdown = false;
private volatile boolean endOfStream = false;
@@ -56,37 +65,60 @@ public class SharedOutputBuffer extends
throw new IllegalArgumentException("I/O content control may not be null");
}
this.ioctrl = ioctrl;
- this.mutex = new Object();
+ this.lock = new ReentrantLock();
+ this.condition = this.lock.newCondition();
}
public void reset() {
if (this.shutdown) {
return;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
clear();
this.endOfStream = false;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasData() {
+ this.lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int available() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.available();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int capacity() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.capacity();
+ } finally {
+ this.lock.unlock();
}
}
@Override
public int length() {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
return super.length();
+ } finally {
+ this.lock.unlock();
}
}
@@ -94,16 +126,17 @@ public class SharedOutputBuffer extends
if (this.shutdown) {
return -1;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
setOutputMode();
int bytesWritten = 0;
- if (hasData()) {
+ if (super.hasData()) {
bytesWritten = encoder.write(this.buffer);
if (encoder.isCompleted()) {
this.endOfStream = true;
}
}
- if (!hasData()) {
+ if (!super.hasData()) {
// No more buffered content
// If at the end of the stream, terminate
if (this.endOfStream && !encoder.isCompleted()) {
@@ -114,8 +147,10 @@ public class SharedOutputBuffer extends
this.ioctrl.suspendOutput();
}
}
- this.mutex.notifyAll();
+ this.condition.signalAll();
return bytesWritten;
+ } finally {
+ this.lock.unlock();
}
}
@@ -128,8 +163,11 @@ public class SharedOutputBuffer extends
return;
}
this.shutdown = true;
- synchronized (this.mutex) {
- this.mutex.notifyAll();
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
}
}
@@ -137,7 +175,8 @@ public class SharedOutputBuffer extends
if (b == null) {
return;
}
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
if (this.shutdown || this.endOfStream) {
throw new IllegalStateException("Buffer already closed for writing");
}
@@ -153,6 +192,8 @@ public class SharedOutputBuffer extends
remaining -= chunk;
off += chunk;
}
+ } finally {
+ this.lock.unlock();
}
}
@@ -164,7 +205,8 @@ public class SharedOutputBuffer extends
}
public void write(int b) throws IOException {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
if (this.shutdown || this.endOfStream) {
throw new IllegalStateException("Buffer already closed for writing");
}
@@ -174,6 +216,8 @@ public class SharedOutputBuffer extends
setInputMode();
}
this.buffer.put((byte)b);
+ } finally {
+ this.lock.unlock();
}
}
@@ -181,28 +225,34 @@ public class SharedOutputBuffer extends
}
private void flushContent() throws IOException {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
try {
- while (hasData()) {
+ while (super.hasData()) {
if (this.shutdown) {
throw new InterruptedIOException("Output operation aborted");
}
this.ioctrl.requestOutput();
- this.mutex.wait();
+ this.condition.await();
}
} catch (InterruptedException ex) {
throw new IOException("Interrupted while flushing the content buffer");
}
+ } finally {
+ this.lock.unlock();
}
}
public void writeCompleted() throws IOException {
- synchronized (this.mutex) {
+ this.lock.lock();
+ try {
if (this.endOfStream) {
return;
}
this.endOfStream = true;
this.ioctrl.requestOutput();
+ } finally {
+ this.lock.unlock();
}
}