You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/01/12 11:01:32 UTC
[httpcomponents-core] 01/02: Made AbstractBinAsyncEntityProducer
and AbstractCharAsyncEntityProducer conditionally threading-safe
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch async-producer-improvements
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
commit c64090e1365d549f1eaadfd7481620cb03923ae0
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Jan 12 11:56:54 2019 +0100
Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe
---
.../core5/testing/nio/MultiLineEntityProducer.java | 10 --
.../nio/entity/AbstractBinAsyncEntityProducer.java | 72 +++++++++-----
.../entity/AbstractCharAsyncEntityProducer.java | 107 +++++++++++++--------
.../http/nio/entity/StringAsyncEntityProducer.java | 10 --
.../entity/TestAbstractBinAsyncEntityProducer.java | 12 +--
.../TestAbstractCharAsyncEntityProducer.java | 10 --
6 files changed, 116 insertions(+), 105 deletions(-)
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
index 85589e8..7c888b9 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
@@ -55,16 +55,6 @@ public class MultiLineEntityProducer extends AbstractCharAsyncEntityProducer {
}
@Override
- public long getContentLength() {
- return -1;
- }
-
- @Override
- public int available() {
- return Integer.MAX_VALUE;
- }
-
- @Override
protected void produceData(final StreamChannel<CharBuffer> channel) throws IOException {
while (charbuf.remaining() > text.length() + 2 && count < total) {
charbuf.put(text + "\r\n");
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
index 234c054..3e9b569 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -41,6 +43,7 @@ import org.apache.hc.core5.util.Args;
*
* @since 5.0
*/
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
private final ByteBuffer bytebuf;
@@ -63,6 +66,8 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
* Triggered to signal the ability of the underlying byte channel
* to accept more data. The data producer can choose to write data
* immediately inside the call or asynchronously at some later point.
+ * <p>
+ * {@link StreamChannel} passed to this method is threading-safe.
*
* @param channel the data channel capable to accepting more data.
*/
@@ -89,6 +94,18 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
}
@Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public final int available() {
+ synchronized (bytebuf) {
+ return bytebuf.remaining();
+ }
+ }
+
+ @Override
public final void produce(final DataStreamChannel channel) throws IOException {
produceData(new StreamChannel<ByteBuffer>() {
@@ -99,39 +116,48 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
if (chunk == 0) {
return 0;
}
- if (bytebuf.remaining() >= chunk) {
- bytebuf.put(src);
- return chunk;
- }
- int totalBytesWritten = 0;
- if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
- bytebuf.flip();
- final int bytesWritten = channel.write(bytebuf);
- bytebuf.compact();
- totalBytesWritten += bytesWritten;
- }
- if (bytebuf.position() == 0) {
- final int bytesWritten = channel.write(src);
- totalBytesWritten += bytesWritten;
+
+ synchronized (bytebuf) {
+ if (bytebuf.remaining() >= chunk) {
+ bytebuf.put(src);
+ return chunk;
+ }
+ int totalBytesWritten = 0;
+ if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+ bytebuf.flip();
+ final int bytesWritten = channel.write(bytebuf);
+ bytebuf.compact();
+ totalBytesWritten += bytesWritten;
+ }
+ if (bytebuf.position() == 0) {
+ final int bytesWritten = channel.write(src);
+ totalBytesWritten += bytesWritten;
+ }
+ if (bytebuf.hasRemaining()) {
+ channel.requestOutput();
+ }
+ return totalBytesWritten;
}
- return totalBytesWritten;
+
}
@Override
public void endStream() throws IOException {
endStream = true;
+ channel.requestOutput();
}
});
- if (endStream || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
- bytebuf.flip();
- channel.write(bytebuf);
- bytebuf.compact();
- }
- if (bytebuf.position() == 0 && endStream) {
- channel.endStream();
+ synchronized (bytebuf) {
+ if (endStream || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+ bytebuf.flip();
+ channel.write(bytebuf);
+ bytebuf.compact();
+ }
+ if (bytebuf.position() == 0 && endStream) {
+ channel.endStream();
+ }
}
}
-
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
index b451e06..275cafe 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
@@ -35,6 +35,8 @@ import java.nio.charset.CoderResult;
import java.nio.charset.StandardCharsets;
import java.util.Set;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -46,6 +48,7 @@ import org.apache.hc.core5.util.Args;
*
* @since 5.0
*/
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
private enum State { ACTIVE, FLUSHING, END_STREAM }
@@ -56,7 +59,6 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
private final int fragmentSizeHint;
private final ContentType contentType;
private final CharsetEncoder charsetEncoder;
- private final StreamChannel<CharBuffer> charDataStream;
private volatile State state;
@@ -73,25 +75,6 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
charset = StandardCharsets.US_ASCII;
}
this.charsetEncoder = charset.newEncoder();
- this.charDataStream = new StreamChannel<CharBuffer>() {
-
- @Override
- public int write(final CharBuffer src) throws IOException {
- Args.notNull(src, "Buffer");
- final int p = src.position();
- final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
- if (result.isError()) {
- result.throwException();
- }
- return src.position() - p;
- }
-
- @Override
- public void endStream() throws IOException {
- state = State.FLUSHING;
- }
-
- };
this.state = State.ACTIVE;
}
@@ -99,6 +82,8 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
* Triggered to signal the ability of the underlying char channel
* to accept more data. The data producer can choose to write data
* immediately inside the call or asynchronously at some later point.
+ * <p>
+ * {@link StreamChannel} passed to this method is threading-safe.
*
* @param channel the data channel capable to accepting more data.
*/
@@ -115,6 +100,11 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
}
@Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
public boolean isChunked() {
return false;
}
@@ -125,30 +115,63 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
}
@Override
- public final void produce(final DataStreamChannel channel) throws IOException {
- if (state.compareTo(State.ACTIVE) == 0) {
- produceData(charDataStream);
+ public final int available() {
+ synchronized (bytebuf) {
+ return bytebuf.remaining();
}
- if (state.compareTo(State.ACTIVE) > 0 || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
- bytebuf.flip();
- channel.write(bytebuf);
- bytebuf.compact();
+ }
+
+ @Override
+ public final void produce(final DataStreamChannel channel) throws IOException {
+ if (state == State.ACTIVE) {
+ produceData(new StreamChannel<CharBuffer>() {
+
+ @Override
+ public int write(final CharBuffer src) throws IOException {
+ Args.notNull(src, "Buffer");
+ synchronized (bytebuf) {
+ final int p = src.position();
+ final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
+ if (result.isError()) {
+ result.throwException();
+ }
+ if (bytebuf.hasRemaining()) {
+ channel.requestOutput();
+ }
+ return src.position() - p;
+ }
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ state = State.FLUSHING;
+ channel.requestOutput();
+ }
+
+ });
}
- if (state.compareTo(State.FLUSHING) == 0) {
- final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
- if (result.isError()) {
- result.throwException();
- } else if (result.isUnderflow()) {
- final CoderResult result2 = charsetEncoder.flush(bytebuf);
- if (result2.isError()) {
+ synchronized (bytebuf) {
+ if (state.compareTo(State.ACTIVE) > 0 || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+ bytebuf.flip();
+ channel.write(bytebuf);
+ bytebuf.compact();
+ }
+ if (state.compareTo(State.FLUSHING) == 0) {
+ final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
+ if (result.isError()) {
result.throwException();
- } else if (result2.isUnderflow()) {
- state = State.END_STREAM;
+ } else if (result.isUnderflow()) {
+ final CoderResult result2 = charsetEncoder.flush(bytebuf);
+ if (result2.isError()) {
+ result.throwException();
+ } else if (result2.isUnderflow()) {
+ state = State.END_STREAM;
+ }
}
}
- }
- if (bytebuf.position() == 0 && state.compareTo(State.END_STREAM) == 0) {
- channel.endStream();
+ if (bytebuf.position() == 0 && state.compareTo(State.END_STREAM) == 0) {
+ channel.endStream();
+ }
}
}
@@ -157,8 +180,10 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
@Override
public final void releaseResources() {
- charsetEncoder.reset();
- state = State.ACTIVE;
+ synchronized (bytebuf) {
+ charsetEncoder.reset();
+ state = State.ACTIVE;
+ }
releaseResourcesInternal();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
index 6940a8c..fb62c33 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
@@ -75,16 +75,6 @@ public class StringAsyncEntityProducer extends AbstractCharAsyncEntityProducer {
}
@Override
- public long getContentLength() {
- return -1;
- }
-
- @Override
- public int available() {
- return Integer.MAX_VALUE;
- }
-
- @Override
protected void produceData(final StreamChannel<CharBuffer> channel) throws IOException {
Asserts.notNull(channel, "Channel");
channel.write(content);
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
index ba026fe..74ff7f1 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
@@ -31,8 +31,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import org.apache.hc.core5.http.WritableByteChannelMock;
import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.WritableByteChannelMock;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.BasicDataStreamChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -73,16 +73,6 @@ public class TestAbstractBinAsyncEntityProducer {
}
@Override
- public long getContentLength() {
- return -1;
- }
-
- @Override
- public int available() {
- return Integer.MAX_VALUE;
- }
-
- @Override
public void failed(final Exception cause) {
}
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
index 329302c..1224679 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
@@ -73,16 +73,6 @@ public class TestAbstractCharAsyncEntityProducer {
}
@Override
- public long getContentLength() {
- return -1;
- }
-
- @Override
- public int available() {
- return Integer.MAX_VALUE;
- }
-
- @Override
public void failed(final Exception cause) {
}