You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/01/12 11:01:32 UTC

[httpcomponents-core] 01/02: Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch async-producer-improvements
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit c64090e1365d549f1eaadfd7481620cb03923ae0
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Jan 12 11:56:54 2019 +0100

    Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe
---
 .../core5/testing/nio/MultiLineEntityProducer.java |  10 --
 .../nio/entity/AbstractBinAsyncEntityProducer.java |  72 +++++++++-----
 .../entity/AbstractCharAsyncEntityProducer.java    | 107 +++++++++++++--------
 .../http/nio/entity/StringAsyncEntityProducer.java |  10 --
 .../entity/TestAbstractBinAsyncEntityProducer.java |  12 +--
 .../TestAbstractCharAsyncEntityProducer.java       |  10 --
 6 files changed, 116 insertions(+), 105 deletions(-)

diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
index 85589e8..7c888b9 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
@@ -55,16 +55,6 @@ public class MultiLineEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public long getContentLength() {
-        return -1;
-    }
-
-    @Override
-    public int available() {
-        return Integer.MAX_VALUE;
-    }
-
-    @Override
     protected void produceData(final StreamChannel<CharBuffer> channel) throws IOException {
         while (charbuf.remaining() > text.length() + 2 && count < total) {
             charbuf.put(text + "\r\n");
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
index 234c054..3e9b569 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -41,6 +43,7 @@ import org.apache.hc.core5.util.Args;
  *
  * @since 5.0
  */
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
 public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
 
     private final ByteBuffer bytebuf;
@@ -63,6 +66,8 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
      * Triggered to signal the ability of the underlying byte channel
      * to accept more data. The data producer can choose to write data
      * immediately inside the call or asynchronously at some later point.
+     * <p>
+     * {@link StreamChannel} passed to this method is threading-safe.
      *
      * @param channel the data channel capable to accepting more data.
      */
@@ -89,6 +94,18 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
     }
 
     @Override
+    public long getContentLength() {
+        return -1;
+    }
+
+    @Override
+    public final int available() {
+        synchronized (bytebuf) {
+            return bytebuf.remaining();
+        }
+    }
+
+    @Override
     public final void produce(final DataStreamChannel channel) throws IOException {
         produceData(new StreamChannel<ByteBuffer>() {
 
@@ -99,39 +116,48 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
                 if (chunk == 0) {
                     return 0;
                 }
-                if (bytebuf.remaining() >= chunk) {
-                    bytebuf.put(src);
-                    return chunk;
-                }
-                int totalBytesWritten = 0;
-                if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-                    bytebuf.flip();
-                    final int bytesWritten = channel.write(bytebuf);
-                    bytebuf.compact();
-                    totalBytesWritten += bytesWritten;
-                }
-                if (bytebuf.position() == 0) {
-                    final int bytesWritten = channel.write(src);
-                    totalBytesWritten += bytesWritten;
+
+                synchronized (bytebuf) {
+                    if (bytebuf.remaining() >= chunk) {
+                        bytebuf.put(src);
+                        return chunk;
+                    }
+                    int totalBytesWritten = 0;
+                    if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+                        bytebuf.flip();
+                        final int bytesWritten = channel.write(bytebuf);
+                        bytebuf.compact();
+                        totalBytesWritten += bytesWritten;
+                    }
+                    if (bytebuf.position() == 0) {
+                        final int bytesWritten = channel.write(src);
+                        totalBytesWritten += bytesWritten;
+                    }
+                    if (bytebuf.hasRemaining()) {
+                        channel.requestOutput();
+                    }
+                    return totalBytesWritten;
                 }
-                return totalBytesWritten;
+
             }
 
             @Override
             public void endStream() throws IOException {
                 endStream = true;
+                channel.requestOutput();
             }
 
         });
 
-        if (endStream || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-            bytebuf.flip();
-            channel.write(bytebuf);
-            bytebuf.compact();
-        }
-        if (bytebuf.position() == 0 && endStream) {
-            channel.endStream();
+        synchronized (bytebuf) {
+            if (endStream || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+                bytebuf.flip();
+                channel.write(bytebuf);
+                bytebuf.compact();
+            }
+            if (bytebuf.position() == 0 && endStream) {
+                channel.endStream();
+            }
         }
     }
-
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
index b451e06..275cafe 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
@@ -35,6 +35,8 @@ import java.nio.charset.CoderResult;
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
 
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -46,6 +48,7 @@ import org.apache.hc.core5.util.Args;
  *
  * @since 5.0
  */
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
 
     private enum State { ACTIVE, FLUSHING, END_STREAM }
@@ -56,7 +59,6 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
     private final int fragmentSizeHint;
     private final ContentType contentType;
     private final CharsetEncoder charsetEncoder;
-    private final StreamChannel<CharBuffer> charDataStream;
 
     private volatile State state;
 
@@ -73,25 +75,6 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
             charset = StandardCharsets.US_ASCII;
         }
         this.charsetEncoder = charset.newEncoder();
-        this.charDataStream = new StreamChannel<CharBuffer>() {
-
-            @Override
-            public int write(final CharBuffer src) throws IOException {
-                Args.notNull(src, "Buffer");
-                final int p = src.position();
-                final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
-                if (result.isError()) {
-                    result.throwException();
-                }
-                return src.position() - p;
-            }
-
-            @Override
-            public void endStream() throws IOException {
-                state = State.FLUSHING;
-            }
-
-        };
         this.state = State.ACTIVE;
     }
 
@@ -99,6 +82,8 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
      * Triggered to signal the ability of the underlying char channel
      * to accept more data. The data producer can choose to write data
      * immediately inside the call or asynchronously at some later point.
+     * <p>
+     * {@link StreamChannel} passed to this method is threading-safe.
      *
      * @param channel the data channel capable to accepting more data.
      */
@@ -115,6 +100,11 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
     }
 
     @Override
+    public long getContentLength() {
+        return -1;
+    }
+
+    @Override
     public boolean isChunked() {
         return false;
     }
@@ -125,30 +115,63 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
     }
 
     @Override
-    public final void produce(final DataStreamChannel channel) throws IOException {
-        if (state.compareTo(State.ACTIVE) == 0) {
-            produceData(charDataStream);
+    public final int available() {
+        synchronized (bytebuf) {
+            return bytebuf.remaining();
         }
-        if (state.compareTo(State.ACTIVE) > 0 || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-            bytebuf.flip();
-            channel.write(bytebuf);
-            bytebuf.compact();
+    }
+
+    @Override
+    public final void produce(final DataStreamChannel channel) throws IOException {
+        if (state == State.ACTIVE) {
+            produceData(new StreamChannel<CharBuffer>() {
+
+                @Override
+                public int write(final CharBuffer src) throws IOException {
+                    Args.notNull(src, "Buffer");
+                    synchronized (bytebuf) {
+                        final int p = src.position();
+                        final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
+                        if (result.isError()) {
+                            result.throwException();
+                        }
+                        if (bytebuf.hasRemaining()) {
+                            channel.requestOutput();
+                        }
+                        return src.position() - p;
+                    }
+                }
+
+                @Override
+                public void endStream() throws IOException {
+                    state = State.FLUSHING;
+                    channel.requestOutput();
+                }
+
+            });
         }
-        if (state.compareTo(State.FLUSHING) == 0) {
-            final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
-            if (result.isError()) {
-                result.throwException();
-            } else if (result.isUnderflow()) {
-                final CoderResult result2 = charsetEncoder.flush(bytebuf);
-                if (result2.isError()) {
+        synchronized (bytebuf) {
+            if (state.compareTo(State.ACTIVE) > 0 || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+                bytebuf.flip();
+                channel.write(bytebuf);
+                bytebuf.compact();
+            }
+            if (state.compareTo(State.FLUSHING) == 0) {
+                final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
+                if (result.isError()) {
                     result.throwException();
-                } else if (result2.isUnderflow()) {
-                    state = State.END_STREAM;
+                } else if (result.isUnderflow()) {
+                    final CoderResult result2 = charsetEncoder.flush(bytebuf);
+                    if (result2.isError()) {
+                        result.throwException();
+                    } else if (result2.isUnderflow()) {
+                        state = State.END_STREAM;
+                    }
                 }
             }
-        }
-        if (bytebuf.position() == 0 && state.compareTo(State.END_STREAM) == 0) {
-            channel.endStream();
+            if (bytebuf.position() == 0 && state.compareTo(State.END_STREAM) == 0) {
+                channel.endStream();
+            }
         }
     }
 
@@ -157,8 +180,10 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
 
     @Override
     public final void releaseResources() {
-        charsetEncoder.reset();
-        state = State.ACTIVE;
+        synchronized (bytebuf) {
+            charsetEncoder.reset();
+            state = State.ACTIVE;
+        }
         releaseResourcesInternal();
     }
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
index 6940a8c..fb62c33 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
@@ -75,16 +75,6 @@ public class StringAsyncEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public long getContentLength() {
-        return -1;
-    }
-
-    @Override
-    public int available() {
-        return Integer.MAX_VALUE;
-    }
-
-    @Override
     protected void produceData(final StreamChannel<CharBuffer> channel) throws IOException {
         Asserts.notNull(channel, "Channel");
         channel.write(content);
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
index ba026fe..74ff7f1 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
@@ -31,8 +31,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
-import org.apache.hc.core5.http.WritableByteChannelMock;
 import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.WritableByteChannelMock;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.BasicDataStreamChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -73,16 +73,6 @@ public class TestAbstractBinAsyncEntityProducer {
         }
 
         @Override
-        public long getContentLength() {
-            return -1;
-        }
-
-        @Override
-        public int available() {
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
         public void failed(final Exception cause) {
         }
 
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
index 329302c..1224679 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
@@ -73,16 +73,6 @@ public class TestAbstractCharAsyncEntityProducer {
         }
 
         @Override
-        public long getContentLength() {
-            return -1;
-        }
-
-        @Override
-        public int available() {
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
         public void failed(final Exception cause) {
         }