You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/21 15:01:37 UTC

[26/32] httpcomponents-core git commit: Moved classes (no functional changes)

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
new file mode 100644
index 0000000..78ae034
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractClassicServerExchangeHandler.java
@@ -0,0 +1,285 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.HttpResponseWrapper;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * @since 5.0
+ */
+public abstract class AbstractClassicServerExchangeHandler implements AsyncServerExchangeHandler {
+
+    private enum State { IDLE, ACTIVE, COMPLETED }
+
+    private final int initialBufferSize;
+    private final Executor executor;
+    private final AtomicReference<State> state;
+    private final AtomicReference<Exception> exception;
+
+    private volatile SharedInputBuffer inputBuffer;
+    private volatile SharedOutputBuffer outputBuffer;
+
+    public AbstractClassicServerExchangeHandler(final int initialBufferSize, final Executor executor) {
+        this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size");
+        this.executor = Args.notNull(executor, "Executor");
+        this.exception = new AtomicReference<>(null);
+        this.state = new AtomicReference<>(State.IDLE);
+    }
+
+    public Exception getException() {
+        return exception.get();
+    }
+
+    protected abstract void handle(
+            HttpRequest request, InputStream requestStream,
+            HttpResponse response, OutputStream responseStream,
+            HttpContext context) throws IOException, HttpException;
+
+    @Override
+    public final void handleRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel,
+            final HttpContext context) throws HttpException, IOException {
+        final AtomicBoolean responseCommitted = new AtomicBoolean(false);
+
+        final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+        final HttpResponse responseWrapper = new HttpResponseWrapper(response){
+
+            private void ensureNotCommitted() {
+                Asserts.check(!responseCommitted.get(), "Response already committed");
+            }
+
+            @Override
+            public void addHeader(final String name, final Object value) {
+                ensureNotCommitted();
+                super.addHeader(name, value);
+            }
+
+            @Override
+            public void setHeader(final String name, final Object value) {
+                ensureNotCommitted();
+                super.setHeader(name, value);
+            }
+
+            @Override
+            public void setVersion(final ProtocolVersion version) {
+                ensureNotCommitted();
+                super.setVersion(version);
+            }
+
+            @Override
+            public void setCode(final int code) {
+                ensureNotCommitted();
+                super.setCode(code);
+            }
+
+            @Override
+            public void setReasonPhrase(final String reason) {
+                ensureNotCommitted();
+                super.setReasonPhrase(reason);
+            }
+
+            @Override
+            public void setLocale(final Locale locale) {
+                ensureNotCommitted();
+                super.setLocale(locale);
+            }
+
+        };
+
+        final InputStream inputStream;
+        if (entityDetails != null) {
+            inputBuffer = new SharedInputBuffer(initialBufferSize);
+            inputStream = new ContentInputStream(inputBuffer);
+        } else {
+            inputStream = null;
+        }
+        outputBuffer = new SharedOutputBuffer(initialBufferSize);
+
+        final OutputStream outputStream = new ContentOutputStream(outputBuffer) {
+
+            private void triggerResponse() throws IOException {
+                try {
+                    if (responseCommitted.compareAndSet(false, true)) {
+                        responseChannel.sendResponse(response, new EntityDetails() {
+
+                            @Override
+                            public long getContentLength() {
+                                return -1;
+                            }
+
+                            @Override
+                            public String getContentType() {
+                                final Header h = response.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+                                return h != null ? h.getValue() : null;
+                            }
+
+                            @Override
+                            public String getContentEncoding() {
+                                final Header h = response.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+                                return h != null ? h.getValue() : null;
+                            }
+
+                            @Override
+                            public boolean isChunked() {
+                                return false;
+                            }
+
+                            @Override
+                            public Set<String> getTrailerNames() {
+                                return null;
+                            }
+
+                        }, context);
+                    }
+                } catch (final HttpException ex) {
+                    throw new IOException(ex.getMessage(), ex);
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                triggerResponse();
+                super.close();
+            }
+
+            @Override
+            public void write(final byte[] b, final int off, final int len) throws IOException {
+                triggerResponse();
+                super.write(b, off, len);
+            }
+
+            @Override
+            public void write(final byte[] b) throws IOException {
+                triggerResponse();
+                super.write(b);
+            }
+
+            @Override
+            public void write(final int b) throws IOException {
+                triggerResponse();
+                super.write(b);
+            }
+
+        };
+
+        if (state.compareAndSet(State.IDLE, State.ACTIVE)) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        handle(request, inputStream, responseWrapper, outputStream, context);
+                        if (inputStream != null) {
+                            inputStream.close();
+                        }
+                        outputStream.close();
+                    } catch (final Exception ex) {
+                        exception.compareAndSet(null, ex);
+                        if (inputBuffer != null) {
+                            inputBuffer.abort();
+                        }
+                        outputBuffer.abort();
+                    } finally {
+                        state.set(State.COMPLETED);
+                    }
+                }
+
+            });
+        }
+    }
+
+    @Override
+    public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        if (inputBuffer != null) {
+            inputBuffer.updateCapacity(capacityChannel);
+        }
+    }
+
+    @Override
+    public final int consume(final ByteBuffer src) throws IOException {
+        Asserts.notNull(inputBuffer, "Input buffer");
+        return inputBuffer.fill(src);
+    }
+
+    @Override
+    public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        Asserts.notNull(inputBuffer, "Input buffer");
+        inputBuffer.markEndStream();
+    }
+
+    @Override
+    public final int available() {
+        Asserts.notNull(outputBuffer, "Output buffer");
+        return outputBuffer.length();
+    }
+
+    @Override
+    public final void produce(final DataStreamChannel channel) throws IOException {
+        Asserts.notNull(outputBuffer, "Output buffer");
+        outputBuffer.flush(channel);
+    }
+
+    @Override
+    public final void failed(final Exception cause) {
+        exception.compareAndSet(null, cause);
+        releaseResources();
+    }
+
+    @Override
+    public void releaseResources() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
new file mode 100644
index 0000000..c6473e6
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/AbstractSharedBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+abstract class AbstractSharedBuffer extends ExpandableBuffer {
+
+    final ReentrantLock lock;
+    final Condition condition;
+
+    volatile boolean endStream;
+    volatile boolean aborted;
+
+    public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(initialBufferSize);
+        this.lock = Args.notNull(lock, "Lock");
+        this.condition = lock.newCondition();
+    }
+
+    @Override
+    public boolean hasData() {
+        lock.lock();
+        try {
+            return super.hasData();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int capacity() {
+        lock.lock();
+        try {
+            return super.capacity();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int length() {
+        lock.lock();
+        try {
+            return super.length();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void abort() {
+        lock.lock();
+        try {
+            endStream = true;
+            aborted = true;
+            condition.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reset() {
+        if (aborted) {
+            return;
+        }
+        lock.lock();
+        try {
+            setInputMode();
+            buffer().clear();
+            endStream = false;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isEndStream() {
+        lock.lock();
+        try {
+            return endStream && !super.hasData();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
new file mode 100644
index 0000000..2871d5e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputBuffer.java
@@ -0,0 +1,84 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+
+/**
+ * Generic content input buffer.
+ *
+ * @since 4.0
+ */
+public interface ContentInputBuffer {
+
+    /**
+     * Return length data stored in the buffer
+     *
+     * @return data length
+     */
+    int length();
+
+    /**
+     * Resets the buffer by clearing its state and stored content.
+     */
+    void reset();
+
+    /**
+     * Reads up to {@code len} bytes of data from this buffer into
+     * an array of bytes. The exact number of bytes read depends how many bytes
+     * are stored in the buffer.
+     *
+     * <p> If {@code off} is negative, or {@code len} is negative, or
+     * {@code off+len} is greater than the length of the array
+     * {@code b}, this method can throw a runtime exception. The exact type
+     * of runtime exception thrown by this method depends on implementation.
+     * This method returns {@code -1} if the end of content stream has been
+     * reached.
+     *
+     * @param      b     the buffer into which the data is read.
+     * @param      off   the start offset in array {@code b}
+     *                   at which the data is written.
+     * @param      len   the maximum number of bytes to read.
+     * @return     the total number of bytes read into the buffer, or
+     *             {@code -1} if there is no more data because the end of
+     *             the stream has been reached.
+     * @throws  IOException  if an I/O error occurs.
+     */
+    int read(byte[] b, int off, int len) throws IOException;
+
+    /**
+     * Reads one byte from this buffer. If the buffer is empty this method can
+     * throw a runtime exception. The exact type of runtime exception thrown
+     * by this method depends on implementation. This method returns
+     * {@code -1} if the end of content stream has been reached.
+     *
+     * @return one byte
+     */
+    int read() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
new file mode 100644
index 0000000..f3e567b
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentInputStream.java
@@ -0,0 +1,82 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@link InputStream} adaptor for {@link ContentInputBuffer}.
+ *
+ * @since 4.0
+ */
+public class ContentInputStream extends InputStream {
+
+    private final ContentInputBuffer buffer;
+
+    public ContentInputStream(final ContentInputBuffer buffer) {
+        super();
+        Args.notNull(buffer, "Input buffer");
+        this.buffer = buffer;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return this.buffer.length();
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        return this.buffer.read(b, off, len);
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (b == null) {
+            return 0;
+        }
+        return this.buffer.read(b, 0, b.length);
+    }
+
+    @Override
+    public int read() throws IOException {
+        return this.buffer.read();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // read and discard the remainder of the message
+        final byte[] tmp = new byte[1024];
+        while (this.buffer.read(tmp, 0, tmp.length) >= 0) {
+        }
+        super.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
new file mode 100644
index 0000000..a9f645e
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputBuffer.java
@@ -0,0 +1,81 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+
+/**
+ * Generic content output buffer.
+ *
+ * @since 4.0
+ */
+public interface ContentOutputBuffer {
+
+    /**
+     * Return length data stored in the buffer
+     *
+     * @return data length
+     */
+    int length();
+
+    /**
+     * Resets the buffer by clearing its state and stored content.
+     */
+    void reset();
+
+    /**
+     * Writes {@code len} bytes from the specified byte array
+     * starting at offset {@code off} to this buffer.
+     * <p>
+     * If {@code off} is negative, or {@code len} is negative, or
+     * {@code off+len} is greater than the length of the array
+     * {@code b}, this method can throw a runtime exception. The exact type
+     * of runtime exception thrown by this method depends on implementation.
+     *
+     * @param      b     the data.
+     * @param      off   the start offset in the data.
+     * @param      len   the number of bytes to write.
+     * @throws  IOException  if an I/O error occurs.
+     */
+    void write(byte[] b, int off, int len) throws IOException;
+
+    /**
+     * Writes the specified byte to this buffer.
+     *
+     * @param      b   the {@code byte}.
+     * @throws  IOException  if an I/O error occurs.
+     */
+    void write(int b) throws IOException;
+
+    /**
+     * Indicates the content has been fully written.
+     * @throws  IOException  if an I/O error occurs.
+     */
+    void writeCompleted() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
new file mode 100644
index 0000000..b01e99d
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/ContentOutputStream.java
@@ -0,0 +1,77 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@link OutputStream} adaptor for {@link ContentOutputBuffer}.
+ *
+ * @since 4.0
+ */
+public class ContentOutputStream extends OutputStream {
+
+    private final ContentOutputBuffer buffer;
+
+    public ContentOutputStream(final ContentOutputBuffer buffer) {
+        super();
+        Args.notNull(buffer, "Output buffer");
+        this.buffer = buffer;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.buffer.writeCompleted();
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        this.buffer.write(b, off, len);
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        if (b == null) {
+            return;
+        }
+        this.buffer.write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        this.buffer.write(b);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
new file mode 100644
index 0000000..d4497fd
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedInputBuffer.java
@@ -0,0 +1,163 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer {
+
+    private volatile CapacityChannel capacityChannel;
+
+    public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(lock, initialBufferSize);
+    }
+
+    public SharedInputBuffer(final int bufferSize) {
+        super(new ReentrantLock(), bufferSize);
+    }
+
+    public int fill(final ByteBuffer src) throws IOException {
+        lock.lock();
+        try {
+            setInputMode();
+            ensureCapacity(buffer().position() + src.remaining());
+            buffer().put(src);
+            final int remaining = buffer().remaining();
+            condition.signalAll();
+            return remaining;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        lock.lock();
+        try {
+            this.capacityChannel = capacityChannel;
+            setInputMode();
+            if (buffer().hasRemaining()) {
+                capacityChannel.update(buffer().remaining());
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void awaitInput() throws InterruptedIOException {
+        if (!buffer().hasRemaining()) {
+            setInputMode();
+            while (buffer().position() == 0 && !endStream && !aborted) {
+                try {
+                    condition.await();
+                } catch (final InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException(ex.getMessage());
+                }
+            }
+            setOutputMode();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        lock.lock();
+        try {
+            setOutputMode();
+            awaitInput();
+            if (aborted) {
+                return -1;
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                return -1;
+            }
+            final int b = buffer().get() & 0xff;
+            if (!buffer().hasRemaining() && capacityChannel != null) {
+                setInputMode();
+                if (buffer().hasRemaining()) {
+                    capacityChannel.update(buffer().remaining());
+                }
+            }
+            return b;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        lock.lock();
+        try {
+            setOutputMode();
+            awaitInput();
+            if (aborted) {
+                return -1;
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                return -1;
+            }
+            final int chunk = Math.min(buffer().remaining(), len);
+            buffer().get(b, off, chunk);
+            if (!buffer().hasRemaining() && capacityChannel != null) {
+                setInputMode();
+                if (buffer().hasRemaining()) {
+                    capacityChannel.update(buffer().remaining());
+                }
+            }
+            return chunk;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void markEndStream() throws IOException {
+        if (endStream) {
+            return;
+        }
+        lock.lock();
+        try {
+            if (!endStream) {
+                endStream = true;
+                capacityChannel = null;
+                condition.signalAll();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
new file mode 100644
index 0000000..ee494f4
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/classic/SharedOutputBuffer.java
@@ -0,0 +1,165 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+/**
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
+
+    private volatile DataStreamChannel dataStreamChannel;
+    private volatile boolean hasCapacity;
+
+    public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
+        super(lock, initialBufferSize);
+        this.hasCapacity = false;
+    }
+
+    public SharedOutputBuffer(final int bufferSize) {
+        this(new ReentrantLock(), bufferSize);
+    }
+
+    public void flush(final DataStreamChannel channel) throws IOException {
+        lock.lock();
+        try {
+            dataStreamChannel = channel;
+            hasCapacity = true;
+            setOutputMode();
+            if (buffer().hasRemaining()) {
+                dataStreamChannel.write(buffer());
+            }
+            if (!buffer().hasRemaining() && endStream) {
+                dataStreamChannel.endStream();
+            }
+            condition.signalAll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void ensureNotAborted() throws InterruptedIOException {
+        if (aborted) {
+            throw new InterruptedIOException("Operation aborted");
+        }
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        final ByteBuffer src = ByteBuffer.wrap(b, off, len);
+        lock.lock();
+        try {
+            ensureNotAborted();
+            setInputMode();
+            while (src.hasRemaining()) {
+                // always buffer small chunks
+                if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
+                    buffer().put(src);
+                } else {
+                    if (buffer().position() > 0 || dataStreamChannel == null) {
+                        waitFlush();
+                    }
+                    if (buffer().position() == 0 && dataStreamChannel != null) {
+                        final int bytesWritten = dataStreamChannel.write(src);
+                        if (bytesWritten == 0) {
+                            hasCapacity = false;
+                            waitFlush();
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        lock.lock();
+        try {
+            ensureNotAborted();
+            setInputMode();
+            if (!buffer().hasRemaining()) {
+                waitFlush();
+            }
+            buffer().put((byte)b);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void writeCompleted() throws IOException {
+        if (endStream) {
+            return;
+        }
+        lock.lock();
+        try {
+            if (!endStream) {
+                endStream = true;
+                if (dataStreamChannel != null) {
+                    setOutputMode();
+                    if (buffer().hasRemaining()) {
+                        dataStreamChannel.requestOutput();
+                    } else {
+                        dataStreamChannel.endStream();
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void waitFlush() throws InterruptedIOException {
+        setOutputMode();
+        if (dataStreamChannel != null) {
+            dataStreamChannel.requestOutput();
+        }
+        ensureNotAborted();
+        while (buffer().hasRemaining() || !hasCapacity) {
+            try {
+                condition.await();
+            } catch (final InterruptedException ex) {
+                Thread.currentThread().interrupt();
+                throw new InterruptedIOException(ex.getMessage());
+            }
+            ensureNotAborted();
+        }
+        setInputMode();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java b/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
index 99b001c..0137607 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/WritableByteChannelMock.java
@@ -107,6 +107,14 @@ public class WritableByteChannelMock implements WritableByteChannel {
         this.buf.clear();
     }
 
+    public byte[] toByteArray() {
+        final ByteBuffer dup = this.buf.duplicate();
+        dup.flip();
+        final byte[] bytes = new byte[dup.remaining()];
+        dup.get(bytes);
+        return bytes;
+    }
+
     public String dump(final Charset charset) throws CharacterCodingException {
         this.buf.flip();
         final CharBuffer charBuffer = charset.newDecoder().decode(this.buf);

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
new file mode 100644
index 0000000..df1c9d3
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedInputBuffer.java
@@ -0,0 +1,243 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class TestSharedInputBuffer {
+
+    @Test
+    public void testBasis() throws Exception {
+
+        final Charset charset = StandardCharsets.US_ASCII;
+        final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+        inputBuffer.fill(charset.encode("1234567890"));
+        Assert.assertEquals(10, inputBuffer.length());
+
+        final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+        inputBuffer.updateCapacity(capacityChannel);
+        Mockito.verifyZeroInteractions(capacityChannel);
+
+        inputBuffer.fill(charset.encode("1234567890"));
+        inputBuffer.fill(charset.encode("1234567890"));
+        Assert.assertEquals(30, inputBuffer.length());
+
+        Mockito.verifyZeroInteractions(capacityChannel);
+
+        final byte[] tmp = new byte[20];
+        final int bytesRead1 = inputBuffer.read(tmp, 0, tmp.length);
+        Assert.assertEquals(20, bytesRead1);
+        Mockito.verifyZeroInteractions(capacityChannel);
+
+        inputBuffer.markEndStream();
+
+        Assert.assertEquals('1', inputBuffer.read());
+        Assert.assertEquals('2', inputBuffer.read());
+        final int bytesRead2 = inputBuffer.read(tmp, 0, tmp.length);
+        Assert.assertEquals(8, bytesRead2);
+        Mockito.verifyZeroInteractions(capacityChannel);
+        Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
+        Assert.assertEquals(-1, inputBuffer.read(tmp, 0, tmp.length));
+        Assert.assertEquals(-1, inputBuffer.read());
+        Assert.assertEquals(-1, inputBuffer.read());
+    }
+
+    @Test
+    public void testMultithreadingRead() throws Exception {
+
+        final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+        final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+        inputBuffer.updateCapacity(capacityChannel);
+        Mockito.verify(capacityChannel).update(10);
+        Mockito.reset(capacityChannel);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                final Charset charset = StandardCharsets.US_ASCII;
+                inputBuffer.fill(charset.encode("1234567890"));
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                final byte[] tmp = new byte[20];
+                return inputBuffer.read(tmp, 0, tmp.length);
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+        Assert.assertEquals(Integer.valueOf(10), task2.get(5, TimeUnit.SECONDS));
+        Mockito.verify(capacityChannel).update(10);
+    }
+
+    @Test
+    public void testMultithreadingSingleRead() throws Exception {
+
+        final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+        final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+        inputBuffer.updateCapacity(capacityChannel);
+        Mockito.verify(capacityChannel).update(10);
+        Mockito.reset(capacityChannel);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                final Charset charset = StandardCharsets.US_ASCII;
+                inputBuffer.fill(charset.encode("a"));
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                return inputBuffer.read();
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+        Assert.assertEquals(Integer.valueOf('a'), task2.get(5, TimeUnit.SECONDS));
+        Mockito.verify(capacityChannel).update(10);
+    }
+
+    @Test
+    public void testMultithreadingReadStream() throws Exception {
+
+        final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+        final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+        inputBuffer.updateCapacity(capacityChannel);
+        Mockito.verify(capacityChannel).update(10);
+        Mockito.reset(capacityChannel);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                final Charset charset = StandardCharsets.US_ASCII;
+                final Random rnd = new Random(System.currentTimeMillis());
+                for (int i = 0; i < 5; i++) {
+                    inputBuffer.fill(charset.encode("1234567890"));
+                    Thread.sleep(rnd.nextInt(250));
+                }
+                inputBuffer.markEndStream();
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<String> task2 = executorService.submit(new Callable<String>() {
+
+            @Override
+            public String call() throws Exception {
+                final Charset charset = StandardCharsets.US_ASCII;
+                final StringBuilder buf = new StringBuilder();
+                final byte[] tmp = new byte[10];
+                int l;
+                while ((l = inputBuffer.read(tmp, 0, tmp.length)) != -1) {
+                    buf.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
+                }
+                return buf.toString();
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+        Assert.assertEquals("12345678901234567890123456789012345678901234567890", task2.get(5, TimeUnit.SECONDS));
+        Mockito.verify(capacityChannel, Mockito.atLeast(1)).update(ArgumentMatchers.anyInt());
+    }
+
+    @Test
+    public void testMultithreadingReadStreamAbort() throws Exception {
+
+        final SharedInputBuffer inputBuffer = new SharedInputBuffer(10);
+
+        final CapacityChannel capacityChannel = Mockito.mock(CapacityChannel.class);
+
+        inputBuffer.updateCapacity(capacityChannel);
+        Mockito.verify(capacityChannel).update(10);
+        Mockito.reset(capacityChannel);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                Thread.sleep(1000);
+                inputBuffer.abort();
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
+
+            @Override
+            public Integer call() throws Exception {
+                return inputBuffer.read();
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+        Assert.assertEquals(Integer.valueOf(-1), task2.get(5, TimeUnit.SECONDS));
+        Mockito.verify(capacityChannel, Mockito.never()).update(10);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/cafa9bb4/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
new file mode 100644
index 0000000..278d1fd
--- /dev/null
+++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java
@@ -0,0 +1,231 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.http.nio.support.classic;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.WritableByteChannelMock;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSharedOutputBuffer {
+
+    static class DataStreamChannelMock implements DataStreamChannel {
+
+        private final WritableByteChannelMock channel;
+
+        DataStreamChannelMock(final WritableByteChannelMock channel) {
+            this.channel = channel;
+        }
+
+        @Override
+        public synchronized int write(final ByteBuffer src) throws IOException {
+            return channel.write(src);
+        }
+
+        @Override
+        public synchronized  void requestOutput() {
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void endStream(final List<? extends Header> trailers) throws IOException {
+            channel.close();
+            notifyAll();
+        }
+
+        @Override
+        public void endStream() throws IOException {
+            endStream(null);
+        }
+
+        public synchronized void awaitOutputRequest() throws InterruptedException {
+            wait();
+        }
+
+    }
+
+    @Test
+    public void testBasis() throws Exception {
+
+        final Charset charset = StandardCharsets.US_ASCII;
+        final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
+
+        final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+        final DataStreamChannel dataStreamChannel = Mockito.spy(new DataStreamChannelMock(channel));
+        outputBuffer.flush(dataStreamChannel);
+
+        Mockito.verifyZeroInteractions(dataStreamChannel);
+
+        Assert.assertEquals(0, outputBuffer.length());
+        Assert.assertEquals(30, outputBuffer.capacity());
+
+        final byte[] tmp = "1234567890".getBytes(charset);
+        outputBuffer.write(tmp, 0, tmp.length);
+        outputBuffer.write(tmp, 0, tmp.length);
+        outputBuffer.write('1');
+        outputBuffer.write('2');
+
+        Assert.assertEquals(22, outputBuffer.length());
+        Assert.assertEquals(8, outputBuffer.capacity());
+
+        Mockito.verifyZeroInteractions(dataStreamChannel);
+    }
+
+    @Test
+    public void testFlush() throws Exception {
+
+        final Charset charset = StandardCharsets.US_ASCII;
+        final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(30);
+
+        final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+        final DataStreamChannel dataStreamChannel = new DataStreamChannelMock(channel);
+        outputBuffer.flush(dataStreamChannel);
+
+        Assert.assertEquals(0, outputBuffer.length());
+        Assert.assertEquals(30, outputBuffer.capacity());
+
+        final byte[] tmp = "1234567890".getBytes(charset);
+        outputBuffer.write(tmp, 0, tmp.length);
+        outputBuffer.write(tmp, 0, tmp.length);
+        outputBuffer.write('1');
+        outputBuffer.write('2');
+
+        outputBuffer.flush(dataStreamChannel);
+
+        Assert.assertEquals(0, outputBuffer.length());
+        Assert.assertEquals(30, outputBuffer.capacity());
+    }
+
+    @Test
+    public void testMultithreadingWriteStream() throws Exception {
+
+        final Charset charset = StandardCharsets.US_ASCII;
+        final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
+
+        final WritableByteChannelMock channel = new WritableByteChannelMock(1024);
+        final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock(channel);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                final byte[] tmp = "1234567890".getBytes(charset);
+                outputBuffer.write(tmp, 0, tmp.length);
+                outputBuffer.write(tmp, 0, tmp.length);
+                outputBuffer.write('1');
+                outputBuffer.write('2');
+                outputBuffer.write(tmp, 0, tmp.length);
+                outputBuffer.write(tmp, 0, tmp.length);
+                outputBuffer.write(tmp, 0, tmp.length);
+                outputBuffer.writeCompleted();
+                outputBuffer.writeCompleted();
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                for (;;) {
+                    outputBuffer.flush(dataStreamChannel);
+                    if (outputBuffer.isEndStream()) {
+                        break;
+                    }
+                    if (!outputBuffer.hasData()) {
+                        dataStreamChannel.awaitOutputRequest();
+                    }
+                }
+                return Boolean.TRUE;
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task1.get(5, TimeUnit.SECONDS));
+        Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
+
+        Assert.assertEquals("1234567890123456789012123456789012345678901234567890", new String(channel.toByteArray(), charset));
+    }
+
+    @Test
+    public void testMultithreadingWriteStreamAbort() throws Exception {
+
+        final Charset charset = StandardCharsets.US_ASCII;
+        final SharedOutputBuffer outputBuffer = new SharedOutputBuffer(20);
+
+        final ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final Future<Boolean> task1 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                final byte[] tmp = "1234567890".getBytes(charset);
+                for (int i = 0; i < 20; i++) {
+                    outputBuffer.write(tmp, 0, tmp.length);
+                }
+                outputBuffer.writeCompleted();
+                return Boolean.TRUE;
+            }
+
+        });
+        final Future<Boolean> task2 = executorService.submit(new Callable<Boolean>() {
+
+            @Override
+            public Boolean call() throws Exception {
+                Thread.sleep(200);
+                outputBuffer.abort();
+                return Boolean.TRUE;
+            }
+
+        });
+
+        Assert.assertEquals(Boolean.TRUE, task2.get(5, TimeUnit.SECONDS));
+        try {
+            task1.get(5, TimeUnit.SECONDS);
+        } catch (final ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof InterruptedIOException);
+        }
+    }
+
+}
+