You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2019/01/18 11:39:01 UTC

[httpcomponents-core] branch async-producer-improvements updated (ee9d4ff -> e659837)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch async-producer-improvements
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git.


 discard ee9d4ff  Added convenience method to test if ContentType instances are of the same MIME type
 discard c64090e  Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe
     new 89f6b52  Added convenience method to test if ContentType instances are of the same MIME type
     new e659837  Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ee9d4ff)
            \
             N -- N -- N   refs/heads/async-producer-improvements (e659837)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core5/testing/nio/MultiLineEntityProducer.java |   8 +-
 .../nio/entity/AbstractBinAsyncEntityProducer.java | 161 +++++++++++++--------
 .../entity/AbstractCharAsyncEntityProducer.java    | 150 ++++++++++++-------
 .../http/nio/entity/StringAsyncEntityProducer.java |   8 +-
 .../entity/TestAbstractBinAsyncEntityProducer.java |  71 +++++++--
 .../TestAbstractCharAsyncEntityProducer.java       |   5 +
 6 files changed, 281 insertions(+), 122 deletions(-)


[httpcomponents-core] 01/02: Added convenience method to test if ContentType instances are of the same MIME type

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch async-producer-improvements
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit 89f6b52e8e0725adfcb6cedad945d60b76e1ad4a
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Jan 12 11:58:13 2019 +0100

    Added convenience method to test if ContentType instances are of the same MIME type
---
 httpcore5/src/main/java/org/apache/hc/core5/http/ContentType.java | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/ContentType.java b/httpcore5/src/main/java/org/apache/hc/core5/http/ContentType.java
index c67e15a..9eacc75 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/ContentType.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/ContentType.java
@@ -405,4 +405,8 @@ public final class ContentType implements Serializable {
         return create(this.getMimeType(), newParams.toArray(new NameValuePair[newParams.size()]), true);
     }
 
+    public boolean isSameMimeType(final ContentType contentType) {
+        return contentType != null && mimeType.equalsIgnoreCase(contentType.getMimeType());
+    }
+
 }


[httpcomponents-core] 02/02: Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch async-producer-improvements
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git

commit e6598377c23f0ba17a9d9dcf92a183cbb11f630a
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Sat Jan 12 11:56:54 2019 +0100

    Made AbstractBinAsyncEntityProducer and AbstractCharAsyncEntityProducer conditionally threading-safe
---
 .../core5/testing/nio/MultiLineEntityProducer.java |  10 +-
 .../nio/entity/AbstractBinAsyncEntityProducer.java | 167 +++++++++++++++------
 .../entity/AbstractCharAsyncEntityProducer.java    | 163 ++++++++++++++------
 .../http/nio/entity/StringAsyncEntityProducer.java |  10 +-
 .../entity/TestAbstractBinAsyncEntityProducer.java |  83 +++++++---
 .../TestAbstractCharAsyncEntityProducer.java       |  15 +-
 6 files changed, 309 insertions(+), 139 deletions(-)

diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
index 85589e8..0c0a2fd 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/MultiLineEntityProducer.java
@@ -55,12 +55,7 @@ public class MultiLineEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public long getContentLength() {
-        return -1;
-    }
-
-    @Override
-    public int available() {
+    protected int availableData() {
         return Integer.MAX_VALUE;
     }
 
@@ -85,9 +80,10 @@ public class MultiLineEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public void releaseResourcesInternal() {
+    public void releaseResources() {
         count = 0;
         charbuf.clear();
+        super.releaseResources();
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
index 234c054..1631adb 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -41,28 +43,91 @@ import org.apache.hc.core5.util.Args;
  *
  * @since 5.0
  */
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
 public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProducer {
 
-    private final ByteBuffer bytebuf;
+    enum State { ACTIVE, FLUSHING, END_STREAM }
+
     private final int fragmentSizeHint;
+    private final ByteBuffer bytebuf;
     private final ContentType contentType;
 
-    private volatile boolean endStream;
+    private volatile State state;
 
-    public AbstractBinAsyncEntityProducer(
-            final int bufferSize,
-            final int fragmentSizeHint,
-            final ContentType contentType) {
-        Args.positive(bufferSize, "Buffer size");
-        this.bytebuf = ByteBuffer.allocate(bufferSize);
-        this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize / 2;
+    public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentType contentType) {
+        this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
+        this.bytebuf = ByteBuffer.allocate(this.fragmentSizeHint);
         this.contentType = contentType;
+        this.state = State.ACTIVE;
+    }
+
+    private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
+        if (bytebuf.position() > 0) {
+            bytebuf.flip();
+            channel.write(bytebuf);
+            bytebuf.compact();
+        }
+    }
+
+    final int writeData(final StreamChannel<ByteBuffer> channel, final ByteBuffer src) throws IOException {
+        final int chunk = src.remaining();
+        if (chunk == 0) {
+            return 0;
+        }
+        if (chunk > fragmentSizeHint) {
+            // the data chunk is greater than the fragment hint
+            // attempt to write it out to the channel directly
+
+            // flush the buffer if not empty
+            flush(channel);
+            if (bytebuf.position() == 0) {
+                return channel.write(src);
+            }
+        } else {
+            // the data chunk is smaller than the fragment hint
+            // attempt to buffer it
+
+            // flush the buffer if there is not enough space to store the chunk
+            if (bytebuf.remaining() < chunk) {
+                flush(channel);
+            }
+            if (bytebuf.remaining() >= chunk) {
+                bytebuf.put(src);
+                if (!bytebuf.hasRemaining()) {
+                    flush(channel);
+                }
+                return chunk;
+            }
+        }
+        return 0;
+    }
+
+    final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
+        if (state == State.ACTIVE) {
+            state = State.FLUSHING;
+            flush(channel);
+            if (bytebuf.position() == 0) {
+                state = State.END_STREAM;
+                channel.endStream();
+            }
+        }
     }
 
     /**
+     * Returns the number of bytes immediately available for output.
+     * This method can be used as a hint to control output events
+     * of the underlying I/O session.
+     *
+     * @return the number of bytes immediately available for output
+     */
+    protected abstract int availableData();
+
+    /**
      * Triggered to signal the ability of the underlying byte channel
      * to accept more data. The data producer can choose to write data
      * immediately inside the call or asynchronously at some later point.
+     * <p>
+     * {@link StreamChannel} passed to this method is threading-safe.
      *
      * @param channel the data channel capable to accepting more data.
      */
@@ -89,49 +154,57 @@ public abstract class AbstractBinAsyncEntityProducer implements AsyncEntityProdu
     }
 
     @Override
+    public long getContentLength() {
+        return -1;
+    }
+
+    @Override
+    public final int available() {
+        if (state == State.ACTIVE) {
+            return availableData();
+        } else {
+            synchronized (bytebuf) {
+                return bytebuf.position();
+            }
+        }
+    }
+
+    @Override
     public final void produce(final DataStreamChannel channel) throws IOException {
-        produceData(new StreamChannel<ByteBuffer>() {
-
-            @Override
-            public int write(final ByteBuffer src) throws IOException {
-                Args.notNull(src, "Buffer");
-                final int chunk = src.remaining();
-                if (chunk == 0) {
-                    return 0;
-                }
-                if (bytebuf.remaining() >= chunk) {
-                    bytebuf.put(src);
-                    return chunk;
-                }
-                int totalBytesWritten = 0;
-                if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-                    bytebuf.flip();
-                    final int bytesWritten = channel.write(bytebuf);
-                    bytebuf.compact();
-                    totalBytesWritten += bytesWritten;
-                }
+        synchronized (bytebuf) {
+            if (state == State.ACTIVE) {
+                produceData(new StreamChannel<ByteBuffer>() {
+
+                    @Override
+                    public int write(final ByteBuffer src) throws IOException {
+                        Args.notNull(src, "Buffer");
+                        synchronized (bytebuf) {
+                            return writeData(channel, src);
+                        }
+                    }
+
+                    @Override
+                    public void endStream() throws IOException {
+                        synchronized (bytebuf) {
+                            streamEnd(channel);
+                        }
+                    }
+
+                });
+            }
+            if (state == State.FLUSHING) {
+                flush(channel);
                 if (bytebuf.position() == 0) {
-                    final int bytesWritten = channel.write(src);
-                    totalBytesWritten += bytesWritten;
+                    state = State.END_STREAM;
+                    channel.endStream();
                 }
-                return totalBytesWritten;
-            }
-
-            @Override
-            public void endStream() throws IOException {
-                endStream = true;
             }
-
-        });
-
-        if (endStream || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-            bytebuf.flip();
-            channel.write(bytebuf);
-            bytebuf.compact();
-        }
-        if (bytebuf.position() == 0 && endStream) {
-            channel.endStream();
         }
     }
 
+    @Override
+    public void releaseResources() {
+        state = State.ACTIVE;
+    }
+
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
index b451e06..abbb26d 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java
@@ -35,6 +35,8 @@ import java.nio.charset.CoderResult;
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
 
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -46,17 +48,17 @@ import org.apache.hc.core5.util.Args;
  *
  * @since 5.0
  */
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
 public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProducer {
 
-    private enum State { ACTIVE, FLUSHING, END_STREAM }
-
     private static final CharBuffer EMPTY = CharBuffer.wrap(new char[0]);
 
+    enum State { ACTIVE, FLUSHING, END_STREAM }
+
     private final ByteBuffer bytebuf;
     private final int fragmentSizeHint;
     private final ContentType contentType;
     private final CharsetEncoder charsetEncoder;
-    private final StreamChannel<CharBuffer> charDataStream;
 
     private volatile State state;
 
@@ -65,7 +67,7 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
             final int fragmentSizeHint,
             final ContentType contentType) {
         Args.positive(bufferSize, "Buffer size");
-        this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize / 2;
+        this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : 0;
         this.bytebuf = ByteBuffer.allocate(bufferSize);
         this.contentType = contentType;
         Charset charset = contentType != null ? contentType.getCharset() : null;
@@ -73,32 +75,77 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
             charset = StandardCharsets.US_ASCII;
         }
         this.charsetEncoder = charset.newEncoder();
-        this.charDataStream = new StreamChannel<CharBuffer>() {
+        this.state = State.ACTIVE;
+    }
 
-            @Override
-            public int write(final CharBuffer src) throws IOException {
-                Args.notNull(src, "Buffer");
-                final int p = src.position();
-                final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
-                if (result.isError()) {
-                    result.throwException();
-                }
-                return src.position() - p;
+    private void flush(final StreamChannel<ByteBuffer> channel) throws IOException {
+        if (bytebuf.position() > 0) {
+            bytebuf.flip();
+            channel.write(bytebuf);
+            bytebuf.compact();
+        }
+    }
+
+    final int writeData(final StreamChannel<ByteBuffer> channel, final CharBuffer src) throws IOException {
+
+        final int chunk = src.remaining();
+        if (chunk == 0) {
+            return 0;
+        }
+
+        final int p = src.position();
+        final CoderResult result = charsetEncoder.encode(src, bytebuf, false);
+        if (result.isError()) {
+            result.throwException();
+        }
+
+        if (!bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
+            flush(channel);
+        }
+
+        return src.position() - p;
+    }
+
+    final void streamEnd(final StreamChannel<ByteBuffer> channel) throws IOException {
+        if (state == State.ACTIVE) {
+            state = State.FLUSHING;
+            if (!bytebuf.hasRemaining()) {
+                flush(channel);
             }
 
-            @Override
-            public void endStream() throws IOException {
-                state = State.FLUSHING;
+            final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
+            if (result.isError()) {
+                result.throwException();
+            }
+            final CoderResult result2 = charsetEncoder.flush(bytebuf);
+            if (result2.isError()) {
+                result.throwException();
+            } else if (result.isUnderflow()) {
+                flush(channel);
+                if (bytebuf.position() == 0) {
+                    state = State.END_STREAM;
+                    channel.endStream();
+                }
             }
+        }
 
-        };
-        this.state = State.ACTIVE;
     }
 
     /**
+     * Returns the number of bytes immediately available for output.
+     * This method can be used as a hint to control output events
+     * of the underlying I/O session.
+     *
+     * @return the number of bytes immediately available for output
+     */
+    protected abstract int availableData();
+
+    /**
      * Triggered to signal the ability of the underlying char channel
      * to accept more data. The data producer can choose to write data
      * immediately inside the call or asynchronously at some later point.
+     * <p>
+     * {@link StreamChannel} passed to this method is threading-safe.
      *
      * @param channel the data channel capable to accepting more data.
      */
@@ -115,6 +162,11 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
     }
 
     @Override
+    public long getContentLength() {
+        return -1;
+    }
+
+    @Override
     public boolean isChunked() {
         return false;
     }
@@ -125,41 +177,62 @@ public abstract class AbstractCharAsyncEntityProducer implements AsyncEntityProd
     }
 
     @Override
-    public final void produce(final DataStreamChannel channel) throws IOException {
-        if (state.compareTo(State.ACTIVE) == 0) {
-            produceData(charDataStream);
-        }
-        if (state.compareTo(State.ACTIVE) > 0 || !bytebuf.hasRemaining() || bytebuf.position() >= fragmentSizeHint) {
-            bytebuf.flip();
-            channel.write(bytebuf);
-            bytebuf.compact();
+    public final int available() {
+        if (state == State.ACTIVE) {
+            return availableData();
+        } else {
+            synchronized (bytebuf) {
+                return bytebuf.position();
+            }
         }
-        if (state.compareTo(State.FLUSHING) == 0) {
-            final CoderResult result = charsetEncoder.encode(EMPTY, bytebuf, true);
-            if (result.isError()) {
-                result.throwException();
-            } else if (result.isUnderflow()) {
-                final CoderResult result2 = charsetEncoder.flush(bytebuf);
-                if (result2.isError()) {
+    }
+
+    @Override
+    public final void produce(final DataStreamChannel channel) throws IOException {
+        synchronized (bytebuf) {
+            if (state == State.ACTIVE) {
+                produceData(new StreamChannel<CharBuffer>() {
+
+                    @Override
+                    public int write(final CharBuffer src) throws IOException {
+                        Args.notNull(src, "Buffer");
+                        synchronized (bytebuf) {
+                            return writeData(channel, src);
+                        }
+                    }
+
+                    @Override
+                    public void endStream() throws IOException {
+                        synchronized (bytebuf) {
+                            streamEnd(channel);
+                        }
+                    }
+
+                });
+            }
+            if (state == State.FLUSHING) {
+                final CoderResult result = charsetEncoder.flush(bytebuf);
+                if (result.isError()) {
                     result.throwException();
-                } else if (result2.isUnderflow()) {
-                    state = State.END_STREAM;
+                } else if (result.isOverflow()) {
+                    flush(channel);
+                } else if (result.isUnderflow()) {
+                    flush(channel);
+                    if (bytebuf.position() == 0) {
+                        state = State.END_STREAM;
+                        channel.endStream();
+                    }
                 }
+
             }
-        }
-        if (bytebuf.position() == 0 && state.compareTo(State.END_STREAM) == 0) {
-            channel.endStream();
-        }
-    }
 
-    protected void releaseResourcesInternal() {
+        }
     }
 
     @Override
-    public final void releaseResources() {
-        charsetEncoder.reset();
+    public void releaseResources() {
         state = State.ACTIVE;
-        releaseResourcesInternal();
+        charsetEncoder.reset();
     }
 
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
index 6940a8c..cd946cc 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/StringAsyncEntityProducer.java
@@ -75,12 +75,7 @@ public class StringAsyncEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public long getContentLength() {
-        return -1;
-    }
-
-    @Override
-    public int available() {
+    protected int availableData() {
         return Integer.MAX_VALUE;
     }
 
@@ -105,8 +100,9 @@ public class StringAsyncEntityProducer extends AbstractCharAsyncEntityProducer {
     }
 
     @Override
-    public void releaseResourcesInternal() {
+    public void releaseResources() {
         this.content.clear();
+        super.releaseResources();
     }
 
 }
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
index ba026fe..584e016 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractBinAsyncEntityProducer.java
@@ -31,8 +31,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
-import org.apache.hc.core5.http.WritableByteChannelMock;
 import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.WritableByteChannelMock;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
 import org.apache.hc.core5.http.nio.BasicDataStreamChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
@@ -48,11 +48,10 @@ public class TestAbstractBinAsyncEntityProducer {
         private int count = 0;
 
         public ChunkByteAsyncEntityProducer(
-                final int bufferSize,
                 final int fragmentSizeHint,
                 final ContentType contentType,
                 final byte[]... content) {
-            super(bufferSize, fragmentSizeHint, contentType);
+            super(fragmentSizeHint, contentType);
             this.content = content;
         }
 
@@ -62,6 +61,11 @@ public class TestAbstractBinAsyncEntityProducer {
         }
 
         @Override
+        protected int availableData() {
+            return Integer.MAX_VALUE;
+        }
+
+        @Override
         protected void produceData(final StreamChannel<ByteBuffer> channel) throws IOException {
             if (count < content.length) {
                 channel.write(ByteBuffer.wrap(content[count]));
@@ -73,31 +77,18 @@ public class TestAbstractBinAsyncEntityProducer {
         }
 
         @Override
-        public long getContentLength() {
-            return -1;
-        }
-
-        @Override
-        public int available() {
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
         public void failed(final Exception cause) {
         }
 
-        @Override
-        public void releaseResources() {
-        }
-
     }
 
     @Test
     public void testProduceDataNoBuffering() throws Exception {
 
         final AsyncEntityProducer producer = new ChunkByteAsyncEntityProducer(
-                256, 0, ContentType.TEXT_PLAIN,
-                new byte[] { '1', '2', '3' }, new byte[] { '4', '5', '6' });
+                0, ContentType.TEXT_PLAIN,
+                new byte[] { '1', '2', '3' },
+                new byte[] { '4', '5', '6' });
 
         Assert.assertEquals(-1, producer.getContentLength());
         Assert.assertEquals(ContentType.TEXT_PLAIN.toString(), producer.getContentType());
@@ -118,10 +109,10 @@ public class TestAbstractBinAsyncEntityProducer {
     }
 
     @Test
-    public void testProduceDataWithBuffering() throws Exception {
+    public void testProduceDataWithBuffering1() throws Exception {
 
         final AsyncEntityProducer producer = new ChunkByteAsyncEntityProducer(
-                256, 5, ContentType.TEXT_PLAIN,
+                5, ContentType.TEXT_PLAIN,
                 new byte[] { '1', '2', '3' },
                 new byte[] { '4', '5', '6' },
                 new byte[] { '7', '8' },
@@ -136,7 +127,52 @@ public class TestAbstractBinAsyncEntityProducer {
 
         producer.produce(streamChannel);
         Assert.assertTrue(byteChannel.isOpen());
-        Assert.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
+        Assert.assertEquals("123", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("45678", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertFalse(byteChannel.isOpen());
+        Assert.assertEquals("90", byteChannel.dump(StandardCharsets.US_ASCII));
+    }
+
+    @Test
+    public void testProduceDataWithBuffering2() throws Exception {
+
+        final AsyncEntityProducer producer = new ChunkByteAsyncEntityProducer(
+                5, ContentType.TEXT_PLAIN,
+                new byte[] { '1' },
+                new byte[] { '2' },
+                new byte[] { '3' },
+                new byte[] { '4', '5' },
+                new byte[] { '6' },
+                new byte[] { '7', '8' },
+                new byte[] { '9', '0' });
+
+        final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
+        final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
 
         producer.produce(streamChannel);
         Assert.assertTrue(byteChannel.isOpen());
@@ -144,7 +180,8 @@ public class TestAbstractBinAsyncEntityProducer {
 
         producer.produce(streamChannel);
         Assert.assertFalse(byteChannel.isOpen());
-        Assert.assertEquals("7890", byteChannel.dump(StandardCharsets.US_ASCII));
+        Assert.assertEquals("67890", byteChannel.dump(StandardCharsets.US_ASCII));
+
     }
 
 }
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
index 329302c..cf344b5 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/entity/TestAbstractCharAsyncEntityProducer.java
@@ -62,6 +62,11 @@ public class TestAbstractCharAsyncEntityProducer {
         }
 
         @Override
+        protected int availableData() {
+            return Integer.MAX_VALUE;
+        }
+
+        @Override
         protected void produceData(final StreamChannel<CharBuffer> channel) throws IOException {
             if (count < content.length) {
                 channel.write(CharBuffer.wrap(content[count]));
@@ -73,16 +78,6 @@ public class TestAbstractCharAsyncEntityProducer {
         }
 
         @Override
-        public long getContentLength() {
-            return -1;
-        }
-
-        @Override
-        public int available() {
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
         public void failed(final Exception cause) {
         }