You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/31 15:12:15 UTC

httpcomponents-core git commit: HTTPCLIENT-1942: Add support for Reactive Streams

Repository: httpcomponents-core
Updated Branches:
  refs/heads/reactive [created] d3f7608c8


HTTPCLIENT-1942: Add support for Reactive Streams

This commit adds initial support for the Reactive Streams specification
[1]. The main part of this change is a pair of non-public classes, the
ReactiveDataProducer and ReactiveDataConsumer, which are reactive
adapters for Apache's AsyncDataProducer and AsyncDataConsumer
interfaces. Two public classes are built on top of these types:

* ReactiveEntityProducer: An AsyncEntityProducer implementation backed
  by ReactiveDataProducer that allows a Publisher<ByteBuffer> to be
  streamed as a request body
* ReactiveResponseConsumer: An AsyncResponseConsumer that exposes a
  special callback that provides a view of the streaming HTTP response
  as a Message<HttpResponse, Publisher<ByteBuffer>>, allowing the body
  to be consumed by a Subscriber<ByteBuffer>

[1] http://www.reactive-streams.org/


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/d3f7608c
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/d3f7608c
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/d3f7608c

Branch: refs/heads/reactive
Commit: d3f7608c812bf7d0f9538b2234ce57624e45f3b3
Parents: 82eb044
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Fri Aug 17 14:16:04 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Aug 31 17:03:58 2018 +0200

----------------------------------------------------------------------
 httpcore5-reactive/pom.xml                      |  94 +++++
 .../hc/core5/reactive/ReactiveDataConsumer.java | 177 ++++++++
 .../hc/core5/reactive/ReactiveDataProducer.java | 170 ++++++++
 .../core5/reactive/ReactiveEntityProducer.java  | 123 ++++++
 .../reactive/ReactiveResponseConsumer.java      | 176 ++++++++
 .../core5/reactive/BasicDataStreamChannel.java  |  77 ++++
 .../reactive/TestReactiveDataConsumer.java      | 208 ++++++++++
 .../reactive/TestReactiveDataProducer.java      |  89 +++++
 .../core5/reactive/WritableByteChannelMock.java | 125 ++++++
 httpcore5-testing/pom.xml                       |  12 +
 .../testing/reactive/ReactiveClientTest.java    | 400 +++++++++++++++++++
 pom.xml                                         |   3 +-
 12 files changed, 1653 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/pom.xml b/httpcore5-reactive/pom.xml
new file mode 100644
index 0000000..7babffb
--- /dev/null
+++ b/httpcore5-reactive/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ====================================================================
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  ====================================================================
+
+  This software consists of voluntary contributions made by many
+  individuals on behalf of the Apache Software Foundation.  For more
+  information on the Apache Software Foundation, please see
+  <http://www.apache.org />.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>httpcore5-parent</artifactId>
+    <groupId>org.apache.httpcomponents.core5</groupId>
+    <version>5.0-beta4-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>httpcore5-reactive</artifactId>
+  <name>Apache HttpComponents Core Reactive Extensions</name>
+  <description>Apache HttpComponents Reactive Streams Bindings</description>
+  <url>http://hc.apache.org/httpcomponents-core-ga</url>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.httpcomponents.core5</groupId>
+      <artifactId>httpcore5</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.reactivestreams</groupId>
+      <artifactId>reactive-streams</artifactId>
+      <version>1.0.2</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.reactivex.rxjava2</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>2.1.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents.core5</groupId>
+      <artifactId>httpcore5-h2</artifactId>
+      <version>5.0-beta3-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+
+  <reporting>
+    <plugins>
+
+      <plugin>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+        <inherited>false</inherited>
+        <reportSets>
+          <reportSet>
+            <reports>
+              <report>dependencies</report>
+              <report>dependency-info</report>
+              <report>summary</report>
+            </reports>
+          </reportSet>
+        </reportSets>
+      </plugin>
+
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
new file mode 100644
index 0000000..9789a43
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -0,0 +1,177 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An asynchronous data consumer that supports Reactive Streams.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
+
+    static final int MAX_BUFFER = 1024 * 1024;
+
+    private final AtomicLong requests = new AtomicLong(0);
+    private final AtomicInteger remainingBufferSpace = new AtomicInteger(MAX_BUFFER);
+
+    private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
+    private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+    private volatile boolean cancelled = false;
+    private volatile boolean completed = false;
+    private volatile Exception exception;
+    private volatile CapacityChannel capacityChannel;
+    private volatile Subscriber<? super ByteBuffer> subscriber;
+
+    public void failed(final Exception cause) {
+        exception = cause;
+        flushToSubscriber();
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        throwIfCancelled();
+        this.capacityChannel = capacityChannel;
+    }
+
+    private void throwIfCancelled() throws IOException {
+        if (cancelled) {
+            throw new H2StreamResetException(H2Error.NO_ERROR,
+                "Downstream subscriber to ReactiveDataConsumer cancelled");
+        }
+    }
+
+    @Override
+    public int consume(final ByteBuffer byteBuffer) throws IOException {
+        if (completed) {
+            throw new IllegalStateException("Received data past end of stream");
+        }
+        throwIfCancelled();
+
+        final byte[] copy = new byte[byteBuffer.remaining()];
+        byteBuffer.get(copy);
+        remainingBufferSpace.addAndGet(-copy.length);
+        buffers.add(ByteBuffer.wrap(copy));
+
+        flushToSubscriber();
+        return remainingBufferSpace.get();
+    }
+
+    @Override
+    public void streamEnd(final List<? extends Header> trailers) {
+        completed = true;
+        flushToSubscriber();
+    }
+
+    @Override
+    public void releaseResources() {
+        this.capacityChannel = null;
+    }
+
+    private void flushToSubscriber() {
+        final Subscriber<? super ByteBuffer> s = subscriber;
+        if (flushInProgress.getAndSet(true)) {
+            return;
+        }
+        try {
+            if (s == null) {
+                return;
+            }
+            if (exception != null) {
+                subscriber = null;
+                s.onError(exception);
+                return;
+            }
+            int windowScalingIncrement = 0;
+            ByteBuffer next;
+            while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
+                final int bytesFreed = next.remaining();
+                remainingBufferSpace.addAndGet(bytesFreed);
+                s.onNext(next);
+                requests.decrementAndGet();
+                windowScalingIncrement += bytesFreed;
+            }
+            if (capacityChannel != null && windowScalingIncrement > 0) {
+                try {
+                    capacityChannel.update(windowScalingIncrement);
+                } catch (final IOException ex) {
+                    failed(ex);
+                    return;
+                }
+            }
+            if (completed && buffers.isEmpty()) {
+                subscriber = null;
+                s.onComplete();
+            }
+        } finally {
+            flushInProgress.set(false);
+        }
+    }
+
+    @Override
+    public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
+        this.subscriber = Args.notNull(subscriber, "subscriber");
+        subscriber.onSubscribe(new Subscription() {
+            @Override
+            public void request(final long increment) {
+                if (increment <= 0) {
+                    failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
+                    return;
+                }
+                requests.addAndGet(increment);
+                flushToSubscriber();
+            }
+
+            @Override
+            public void cancel() {
+                ReactiveDataConsumer.this.cancelled = true;
+                ReactiveDataConsumer.this.subscriber = null;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
new file mode 100644
index 0000000..33d7773
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -0,0 +1,170 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.AsyncDataProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * An asynchronous data producer that supports Reactive Streams.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
+
+    private static final int BUFFER_WINDOW_SIZE = 5;
+
+    private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
+    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
+    private final AtomicBoolean complete = new AtomicBoolean(false);
+    private final Publisher<ByteBuffer> publisher;
+    private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
+    private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
+
+    public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
+        this.publisher = Args.notNull(publisher, "publisher");
+    }
+
+    @Override
+    public void onSubscribe(final Subscription subscription) {
+        if (this.subscription.getAndSet(subscription) != null) {
+            throw new IllegalStateException("Already subscribed");
+        }
+
+        subscription.request(BUFFER_WINDOW_SIZE);
+    }
+
+    @Override
+    public void onNext(final ByteBuffer byteBuffer) {
+        final byte[] copy = new byte[byteBuffer.remaining()];
+        byteBuffer.get(copy);
+        synchronized (buffers) {
+            buffers.add(ByteBuffer.wrap(copy));
+        }
+        signalReadiness();
+    }
+
+    @Override
+    public void onError(final Throwable throwable) {
+        subscription.set(null);
+        exception.set(throwable);
+        signalReadiness();
+    }
+
+    @Override
+    public void onComplete() {
+        subscription.set(null);
+        complete.set(true);
+        signalReadiness();
+    }
+
+    private void signalReadiness() {
+        final DataStreamChannel channel = requestChannel.get();
+        if (channel == null) {
+            throw new IllegalStateException("Output channel is not set");
+        }
+        channel.requestOutput();
+    }
+
+    @Override
+    public int available() {
+        if (exception.get() != null || complete.get()) {
+            return 1;
+        } else {
+            synchronized (buffers) {
+                int sum = 0;
+                for (final ByteBuffer buffer : buffers) {
+                    sum += buffer.remaining();
+                }
+                return sum;
+            }
+        }
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (requestChannel.get() == null) {
+            requestChannel.set(channel);
+            publisher.subscribe(this);
+        }
+
+        final Throwable t = exception.get();
+        final Subscription s = subscription.get();
+        int buffersToReplenish = 0;
+        try {
+            synchronized (buffers) {
+                if (t != null) {
+                    final H2StreamResetException ex = new H2StreamResetException(H2Error.NO_ERROR,
+                        "Request publisher threw an exception");
+                    ex.initCause(t);
+                    throw ex;
+                } else if (this.complete.get() && buffers.isEmpty()) {
+                    channel.endStream();
+                } else {
+                    while (buffers.size() > 0) {
+                        final ByteBuffer nextBuffer = buffers.remove();
+                        channel.write(nextBuffer);
+                        if (nextBuffer.remaining() > 0) {
+                            buffers.push(nextBuffer);
+                            break;
+                        } else if (s != null) {
+                            // We defer the #request call until after we release the buffer lock.
+                            buffersToReplenish++;
+                        }
+                    }
+                }
+            }
+        } finally {
+            if (s != null && buffersToReplenish > 0) {
+                s.request(buffersToReplenish);
+            }
+        }
+    }
+
+    @Override
+    public void releaseResources() {
+        final Subscription s = subscription.getAndSet(null);
+        if (s != null) {
+            s.cancel();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
new file mode 100644
index 0000000..2fc4a98
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
@@ -0,0 +1,123 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+/**
+ * An {@link AsyncEntityProducer} that subscribes to a {@code Publisher}
+ * instance, as defined by the Reactive Streams specification.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class ReactiveEntityProducer implements AsyncEntityProducer {
+
+    private final ReactiveDataProducer reactiveDataProducer;
+
+    private final long contentLength;
+    private final String contentType;
+    private final String contentEncoding;
+
+    /**
+     * Creates a new {@code ReactiveEntityProducer} with the given parameters.
+     *
+     * @param publisher the publisher of the entity stream.
+     * @param contentLength the length of the entity, or -1 if unknown (implies chunked encoding).
+     * @param contentType the {@code Content-Type} of the entity, or null if none.
+     * @param contentEncoding the {@code Content-Encoding} of the entity, or null if none.
+     */
+    public ReactiveEntityProducer(
+        final Publisher<ByteBuffer> publisher,
+        final long contentLength,
+        final String contentType,
+        final String contentEncoding
+    ) {
+        this.reactiveDataProducer = new ReactiveDataProducer(publisher);
+        this.contentLength = contentLength;
+        this.contentType = contentType;
+        this.contentEncoding = contentEncoding;
+    }
+
+    @Override
+    public int available() {
+        return reactiveDataProducer.available();
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        reactiveDataProducer.produce(channel);
+    }
+
+    @Override
+    public void releaseResources() {
+        reactiveDataProducer.releaseResources();
+    }
+
+    @Override
+    public boolean isRepeatable() {
+        return false;
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        releaseResources();
+    }
+
+    @Override
+    public long getContentLength() {
+        return contentLength;
+    }
+
+    @Override
+    public String getContentType() {
+        return contentType;
+    }
+
+    @Override
+    public String getContentEncoding() {
+        return contentEncoding;
+    }
+
+    @Override
+    public boolean isChunked() {
+        return contentLength == -1;
+    }
+
+    @Override
+    public Set<String> getTrailerNames() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
new file mode 100644
index 0000000..d0d746f
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
@@ -0,0 +1,176 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * An {@link AsyncResponseConsumer} that publishes the response body through
+ * a {@link Publisher}, as defined by the Reactive Streams specification. The
+ * response is represented as a {@link Message} consisting of a {@link
+ * HttpResponse} representing the headers and a {@link Publisher} representing
+ * the response body as an asynchronous stream of {@link ByteBuffer} instances.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
+
+    private final ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
+    private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
+    private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
+
+    private volatile BasicFuture<Void> responseCompletion;
+    private volatile HttpResponse informationResponse;
+    private volatile EntityDetails entityDetails;
+
+    /**
+     * Creates a {@code ReactiveResponseConsumer}.
+     */
+    public ReactiveResponseConsumer() {
+        this.responseFuture = new BasicFuture<>(null);
+    }
+
+    /**
+     * Creates a {@code ReactiveResponseConsumer} that will call back the supplied {@link FutureCallback} with a
+     * streamable response.
+     *
+     * @param responseCallback the callback to invoke when the response is available for consumption.
+     */
+    public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
+        this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
+    }
+
+    public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
+        return responseFuture;
+    }
+
+    /**
+     * Returns the intermediate (1xx) HTTP response if one was received.
+     *
+     * @return the information response, or {@code null} if none.
+     */
+    public HttpResponse getInformationResponse() {
+        return informationResponse;
+    }
+
+    /**
+     * Returns the response entity details.
+     *
+     * @return the entity details, or {@code null} if none.
+     */
+    public EntityDetails getEntityDetails() {
+        return entityDetails;
+    }
+
+    /**
+     * Returns the trailers received at the end of the response.
+     *
+     * @return a non-null list of zero or more trailers.
+     */
+    public List<Header> getTrailers() {
+        return trailers;
+    }
+
+    @Override
+    public void consumeResponse(
+        final HttpResponse response,
+        final EntityDetails entityDetails,
+        final HttpContext httpContext,
+        final FutureCallback<Void> resultCallback
+    ) {
+        this.entityDetails = entityDetails;
+        this.responseCompletion = new BasicFuture<>(resultCallback);
+        this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
+    }
+
+    @Override
+    public void informationResponse(final HttpResponse response, final HttpContext httpContext) {
+        this.informationResponse = response;
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        reactiveDataConsumer.failed(cause);
+        responseFuture.failed(cause);
+        if (responseCompletion != null) {
+            responseCompletion.failed(cause);
+        }
+    }
+
+    @Override
+    public Void getResult() {
+        return null;
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        reactiveDataConsumer.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    public int consume(final ByteBuffer src) throws IOException {
+        return reactiveDataConsumer.consume(src);
+    }
+
+    @Override
+    public void streamEnd(final List<? extends Header> trailers) {
+        if (trailers != null) {
+            this.trailers.addAll(trailers);
+        }
+        reactiveDataConsumer.streamEnd(trailers);
+        responseCompletion.completed(null);
+    }
+
+    @Override
+    public void releaseResources() {
+        reactiveDataConsumer.releaseResources();
+        responseFuture.cancel();
+        if (responseCompletion != null) {
+            responseCompletion.cancel();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
new file mode 100644
index 0000000..6510f04
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
@@ -0,0 +1,77 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicDataStreamChannel implements DataStreamChannel {
+
+    private final WritableByteChannel byteChannel;
+    private List<Header> trailers;
+
+    public BasicDataStreamChannel(final WritableByteChannel byteChannel) {
+        this.byteChannel = byteChannel;
+    }
+
+    @Override
+    public void requestOutput() {
+    }
+
+    @Override
+    public int write(final ByteBuffer src) throws IOException {
+        return byteChannel.write(src);
+    }
+
+    @Override
+    public void endStream() throws IOException {
+        if (byteChannel.isOpen()) {
+            byteChannel.close();
+        }
+    }
+
+    @Override
+    public void endStream(final List<? extends Header> trailers) throws IOException {
+        endStream();
+        if (trailers != null) {
+            this.trailers = new ArrayList<>();
+            this.trailers.addAll(trailers);
+        }
+    }
+
+    public List<Header> getTrailers() {
+        return trailers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
new file mode 100644
index 0000000..bd380fe
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
@@ -0,0 +1,208 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import io.reactivex.Flowable;
+import io.reactivex.Notification;
+import io.reactivex.Observable;
+import io.reactivex.Single;
+import io.reactivex.functions.Consumer;
+
+public class TestReactiveDataConsumer {
+    @Test
+    public void testStreamThatEndsNormally() throws Exception {
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+
+        final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<ByteBuffer>());
+
+        final CountDownLatch complete = new CountDownLatch(1);
+        Observable.fromPublisher(consumer)
+            .materialize()
+            .forEach(new Consumer<Notification<ByteBuffer>>() {
+                @Override
+                public void accept(final Notification<ByteBuffer> byteBufferNotification) throws Exception {
+                    if (byteBufferNotification.isOnComplete()) {
+                        complete.countDown();
+                    } else if (byteBufferNotification.isOnNext()) {
+                        output.add(byteBufferNotification.getValue());
+                    } else {
+                        throw new IllegalArgumentException();
+                    }
+                }
+            });
+
+        consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
+        consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
+        consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
+        consumer.streamEnd(null);
+
+        Assert.assertTrue("Stream did not finish before timeout", complete.await(1, TimeUnit.SECONDS));
+        Assert.assertEquals(3, output.size());
+        Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
+        Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
+        Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
+    }
+
+    @Test
+    public void testStreamThatEndsWithError() {
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+        final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
+            .materialize()
+            .toList();
+
+        final Exception ex = new RuntimeException();
+        consumer.failed(ex);
+
+        Assert.assertSame(ex, single.blockingGet().get(0).getError());
+    }
+
+    @Test(expected = H2StreamResetException.class)
+    public void testCancellation() throws Exception {
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+        consumer.subscribe(new Subscriber<ByteBuffer>() {
+            @Override
+            public void onSubscribe(final Subscription s) {
+                s.cancel();
+            }
+
+            @Override
+            public void onNext(final ByteBuffer byteBuffer) {
+            }
+
+            @Override
+            public void onError(final Throwable throwable) {
+            }
+
+            @Override
+            public void onComplete() {
+            }
+        });
+
+        consumer.consume(ByteBuffer.wrap(new byte[1024]));
+    }
+
+    @Test
+    public void testCapacityIncrements() throws Exception {
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+        final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+        final AtomicInteger lastIncrement = new AtomicInteger(-1);
+        final CapacityChannel channel = new CapacityChannel() {
+            @Override
+            public void update(final int increment) {
+                lastIncrement.set(increment);
+            }
+        };
+        consumer.updateCapacity(channel);
+        Assert.assertEquals("CapacityChannel#update should not have been invoked yet", -1, lastIncrement.get());
+
+        final AtomicInteger received = new AtomicInteger(0);
+        final AtomicReference<Subscription> subscription = new AtomicReference<>();
+        consumer.subscribe(new Subscriber<ByteBuffer>() {
+            @Override
+            public void onSubscribe(final Subscription s) {
+                subscription.set(s);
+            }
+
+            @Override
+            public void onNext(final ByteBuffer byteBuffer) {
+                received.incrementAndGet();
+            }
+
+            @Override
+            public void onError(final Throwable throwable) {
+            }
+
+            @Override
+            public void onComplete() {
+            }
+        });
+
+        Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (1 * 1024), consumer.consume(data.duplicate()));
+        Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (2 * 1024), consumer.consume(data.duplicate()));
+        Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (3 * 1024), consumer.consume(data.duplicate()));
+        Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (4 * 1024), consumer.consume(data.duplicate()));
+
+        subscription.get().request(1);
+        Assert.assertEquals(1024, lastIncrement.get());
+
+        subscription.get().request(2);
+        Assert.assertEquals(2 * 1024, lastIncrement.get());
+
+        subscription.get().request(99);
+        Assert.assertEquals(1024, lastIncrement.get());
+    }
+
+    @Test
+    public void testFullResponseBuffering() throws Exception {
+        // Due to inherent race conditions, is possible for the entire response to be buffered and completed before
+        // the Subscriber shows up. This must be handled correctly.
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+        final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+        consumer.consume(data.duplicate());
+        consumer.consume(data.duplicate());
+        consumer.consume(data.duplicate());
+        consumer.streamEnd(null);
+
+        Assert.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
+    }
+
+    @Test
+    public void testErrorBuffering() throws Exception {
+        final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+        final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+        final RuntimeException ex = new RuntimeException();
+        consumer.consume(data.duplicate());
+        consumer.consume(data.duplicate());
+        consumer.consume(data.duplicate());
+        consumer.failed(ex);
+
+        final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
+            .materialize()
+            .singleOrError()
+            .blockingGet();
+        Assert.assertSame(ex, result.getError());
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
new file mode 100644
index 0000000..1117439
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
@@ -0,0 +1,89 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.reactivex.Flowable;
+
+public class TestReactiveDataProducer {
+    @Test
+    public void testStreamThatEndsNormally() throws Exception {
+        final Flowable<ByteBuffer> publisher = Flowable.just(
+            ByteBuffer.wrap(new byte[]{ '1', '2', '3' }),
+            ByteBuffer.wrap(new byte[]{ '4', '5', '6' }));
+        final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
+
+        final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
+        final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
+
+        producer.produce(streamChannel);
+
+        Assert.assertTrue(byteChannel.isOpen());
+        Assert.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        producer.produce(streamChannel);
+
+        Assert.assertFalse(byteChannel.isOpen());
+        Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+    }
+
+    @Test
+    public void testStreamThatEndsWithError() throws Exception {
+        final Flowable<ByteBuffer> publisher = Flowable.concatArray(
+            Flowable.just(
+                ByteBuffer.wrap(new byte[]{ '1' }),
+                ByteBuffer.wrap(new byte[]{ '2' }),
+                ByteBuffer.wrap(new byte[]{ '3' }),
+                ByteBuffer.wrap(new byte[]{ '4' }),
+                ByteBuffer.wrap(new byte[]{ '5' }),
+                ByteBuffer.wrap(new byte[]{ '6' })),
+            Flowable.<ByteBuffer>error(new RuntimeException())
+        );
+        final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
+
+        final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
+        final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
+
+        producer.produce(streamChannel);
+        Assert.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
+
+        try {
+            producer.produce(streamChannel);
+            Assert.fail("Expected ProtocolException");
+        } catch (final H2StreamResetException ex) {
+            Assert.assertTrue("Expected published exception to be rethrown", ex.getCause() instanceof RuntimeException);
+            Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
new file mode 100644
index 0000000..679181d
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
@@ -0,0 +1,125 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+public class WritableByteChannelMock implements WritableByteChannel {
+
+    private final int capacityLimit;
+
+    private int capacityUsed;
+    private ByteBuffer buf;
+    private boolean closed;
+
+    public WritableByteChannelMock(final int initialSize, final int capacityLimit) {
+        this.buf = ByteBuffer.allocate(initialSize);
+        this.capacityLimit = capacityLimit;
+    }
+
+    public WritableByteChannelMock(final int initialSize) {
+        this(initialSize, 0);
+    }
+
+    private void expandCapacity(final int capacity) {
+        final ByteBuffer oldbuffer = this.buf;
+        this.buf = ByteBuffer.allocate(capacity);
+        oldbuffer.flip();
+        this.buf.put(oldbuffer);
+    }
+
+    private void ensureCapacity(final int requiredCapacity) {
+        if (requiredCapacity > this.buf.capacity()) {
+            expandCapacity(requiredCapacity);
+        }
+    }
+
+    @Override
+    public int write(final ByteBuffer src) throws IOException {
+        if (this.closed) {
+            throw new ClosedChannelException();
+        }
+        final int len = src.remaining();
+        ensureCapacity(this.buf.position() + len);
+        if (this.capacityLimit > 0) {
+            final int chunk = Math.min(this.capacityLimit - this.capacityUsed, len);
+            if (chunk > 0) {
+                final int limit = src.limit();
+                src.limit(src.position() + chunk);
+                this.buf.put(src);
+                src.limit(limit);
+                this.capacityUsed += chunk;
+                return chunk;
+            }
+            return 0;
+        }
+        this.buf.put(src);
+        return len;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return !this.closed;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+    }
+
+    public void flush() {
+        this.capacityUsed = 0;
+    }
+
+    public void reset() {
+        this.capacityUsed = 0;
+        this.buf.clear();
+    }
+
+    public byte[] toByteArray() {
+        final ByteBuffer dup = this.buf.duplicate();
+        dup.flip();
+        final byte[] bytes = new byte[dup.remaining()];
+        dup.get(bytes);
+        return bytes;
+    }
+
+    public String dump(final Charset charset) throws CharacterCodingException {
+        this.buf.flip();
+        final CharBuffer charBuffer = charset.newDecoder().decode(this.buf);
+        this.buf.compact();
+        return charBuffer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-testing/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml
index 7cf41d7..e9e1f1c 100644
--- a/httpcore5-testing/pom.xml
+++ b/httpcore5-testing/pom.xml
@@ -50,6 +50,12 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents.core5</groupId>
+      <artifactId>httpcore5-reactive</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <scope>compile</scope>
@@ -73,6 +79,12 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>io.reactivex.rxjava2</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>2.1.9</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
new file mode 100644
index 0000000..21c4be4
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -0,0 +1,400 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing.reactive;
+
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactor.ExceptionEvent;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
+import org.apache.hc.core5.testing.nio.EchoHandler;
+import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
+import org.apache.hc.core5.testing.nio.LoggingHttp2StreamListener;
+import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
+import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@RunWith(Parameterized.class)
+public class ReactiveClientTest {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> protocols() {
+        return Arrays.asList(new Object[][]{
+            { HttpVersionPolicy.FORCE_HTTP_1 },
+            { HttpVersionPolicy.FORCE_HTTP_2 }
+        });
+    }
+    private static final Timeout TIMEOUT = Timeout.ofSeconds(3000);
+    private static final Random RANDOM = new Random();
+
+    private final HttpVersionPolicy versionPolicy;
+
+    public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
+        this.versionPolicy = httpVersionPolicy;
+    }
+
+    private HttpAsyncServer server;
+
+    @Rule
+    public ExternalResource serverResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            log.debug("Starting up test server");
+            server = H2ServerBootstrap.bootstrap()
+                .setVersionPolicy(versionPolicy)
+                .setIOReactorConfig(
+                    IOReactorConfig.custom()
+                        .setSoTimeout(TIMEOUT)
+                        .build())
+                .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
+                .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
+                .setStreamListener(LoggingHttp2StreamListener.INSTANCE)
+                .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
+                .register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+                    @Override
+                    public AsyncServerExchangeHandler get() {
+                        return new EchoHandler(10 * 1024 * 1024);
+                    }
+
+                })
+                .create();
+        }
+
+        @Override
+        protected void after() {
+            log.debug("Shutting down test server");
+            if (server != null) {
+                try {
+                    server.close(CloseMode.GRACEFUL);
+                    final List<ExceptionEvent> exceptionLog = server.getExceptionLog();
+                    server = null;
+                    if (!exceptionLog.isEmpty()) {
+                        for (final ExceptionEvent event: exceptionLog) {
+                            final Throwable cause = event.getCause();
+                            log.error("Unexpected " + cause.getClass() + " at " + event.getTimestamp(), cause);
+                        }
+                    }
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    private HttpAsyncRequester requester;
+
+    @Rule
+    public ExternalResource clientResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            log.debug("Starting up test client");
+            requester = H2RequesterBootstrap.bootstrap()
+                .setVersionPolicy(versionPolicy)
+                .setIOReactorConfig(IOReactorConfig.custom()
+                    .setSoTimeout(TIMEOUT)
+                    .build())
+                .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
+                .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
+                .setStreamListener(LoggingHttp2StreamListener.INSTANCE)
+                .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
+                .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
+                .create();
+        }
+
+        @Override
+        protected void after() {
+            log.debug("Shutting down test client");
+            if (requester != null) {
+                try {
+                    requester.close(CloseMode.GRACEFUL);
+                    final List<ExceptionEvent> exceptionLog = requester.getExceptionLog();
+                    requester = null;
+                    if (!exceptionLog.isEmpty()) {
+                        for (final ExceptionEvent event: exceptionLog) {
+                            final Throwable cause = event.getCause();
+                            log.error("Unexpected " + cause.getClass() + " at " + event.getTimestamp(), cause);
+                        }
+                    }
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    @Test(timeout = 5_000)
+    public void testSimpleRequest() throws Exception {
+        final InetSocketAddress address = startClientAndServer();
+        final byte[] input = new byte[1024];
+        RANDOM.nextBytes(input);
+        final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
+
+        final BasicRequestProducer request = getRequestProducer(address, producer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+        requester.execute(request, consumer, Timeout.ofSeconds(2), null);
+
+        final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+        final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
+        for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
+            writableByteChannel.write(byteBuffer);
+        }
+        writableByteChannel.close();
+        final byte[] output = byteArrayOutputStream.toByteArray();
+        Assert.assertArrayEquals(input, output);
+    }
+
+    private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
+        return new BasicRequestProducer("POST",
+            URI.create("http://localhost:" + address.getPort()), producer);
+    }
+
+    @Test(timeout = 20_000)
+    public void testLongRunningRequest() throws Exception {
+        final InetSocketAddress address = startClientAndServer();
+        final AtomicLong requestLength = new AtomicLong(0L);
+        final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
+        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 500)
+            .map(new Function<Long, ByteBuffer>() {
+                @Override
+                public ByteBuffer apply(final Long seed) {
+                    final Random random = new Random(seed);
+                    final byte[] bytes = new byte[1 + random.nextInt(32 * 1024)];
+                    requestLength.addAndGet(bytes.length);
+                    random.nextBytes(bytes);
+                    return ByteBuffer.wrap(bytes);
+                }
+            })
+            .doOnNext(new Consumer<ByteBuffer>() {
+                @Override
+                public void accept(final ByteBuffer byteBuffer) {
+                    requestDigest.get().update(byteBuffer.duplicate());
+                }
+            });
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+        final BasicRequestProducer request = getRequestProducer(address, producer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+        requester.execute(request, consumer, Timeout.ofSeconds(2), null);
+        final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+        final AtomicLong responseLength = new AtomicLong(0);
+        final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
+        Flowable.fromPublisher(response.getBody())
+            .blockingForEach(new Consumer<ByteBuffer>() {
+                @Override
+                public void accept(final ByteBuffer byteBuffer) {
+                    responseLength.addAndGet(byteBuffer.remaining());
+                    responseDigest.get().update(byteBuffer);
+                }
+            });
+        Assert.assertEquals(requestLength.get(), responseLength.get());
+        Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+    }
+
+    @Test(timeout = 5_000)
+    public void testRequestError() throws Exception {
+        final InetSocketAddress address = startClientAndServer();
+        final RuntimeException exceptionThrown = new RuntimeException("Test");
+        final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
+
+        final BasicRequestProducer request = getRequestProducer(address, producer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+
+        final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+
+        try {
+            future.get();
+            Assert.fail("Expected exception");
+        } catch (final ExecutionException ex) {
+            Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+            Assert.assertSame(exceptionThrown, ex.getCause().getCause());
+        }
+    }
+
+    @Test(timeout = 5_000)
+    public void testRequestTimeout() throws Exception {
+        final InetSocketAddress address = startClientAndServer();
+        final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
+        final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
+            .doOnCancel(new Action() {
+                @Override
+                public void run() {
+                    requestPublisherWasCancelled.set(true);
+                }
+            });
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+        final BasicRequestProducer request = getRequestProducer(address, producer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+        final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+
+        try {
+            future.get();
+        } catch (final ExecutionException ex) {
+            Assert.assertTrue(requestPublisherWasCancelled.get());
+            final Throwable cause = ex.getCause();
+            if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
+                Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
+                    cause instanceof SocketTimeoutException);
+            } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
+                Assert.assertTrue(String.format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
+                    cause instanceof H2StreamResetException);
+                Assert.assertEquals(H2Error.NO_ERROR.getCode(), ((H2StreamResetException) cause).getCode());
+            } else {
+                Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
+            }
+        }
+    }
+
+    @Test(timeout = 5_000)
+    public void testResponseCancellation() throws Exception {
+        final InetSocketAddress address = startClientAndServer();
+        final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
+        final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
+        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE)
+            .map(new Function<Long, ByteBuffer>() {
+                @Override
+                public ByteBuffer apply(final Long seed) throws Exception {
+                    final Random random = new Random(seed);
+                    final byte[] bytes = new byte[1 + random.nextInt(1024)];
+                    random.nextBytes(bytes);
+                    return ByteBuffer.wrap(bytes);
+                }
+            })
+            .doOnCancel(new Action() {
+                @Override
+                public void run() throws Exception {
+                    requestPublisherWasCancelled.set(true);
+                }
+            })
+            .doOnError(new Consumer<Throwable>() {
+                @Override
+                public void accept(final Throwable throwable) throws Exception {
+                    requestStreamError.set(throwable);
+                }
+            });
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+        final BasicRequestProducer request = getRequestProducer(address, producer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+        final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+        final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+        final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
+        final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
+            .doOnCancel(new Action() {
+                @Override
+                public void run() throws Exception {
+                    responsePublisherWasCancelled.set(true);
+                }
+            })
+            .take(3)
+            .toList()
+            .blockingGet();
+        Assert.assertEquals(3, outputBuffers.size());
+        Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
+        try {
+            future.get();
+            Assert.fail("Expected exception");
+        } catch (final ExecutionException | CancellationException ex) {
+            Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+            Assert.assertTrue(requestPublisherWasCancelled.get());
+            Assert.assertNull(requestStreamError.get());
+        }
+    }
+
+    private static MessageDigest newDigest() throws NoSuchAlgorithmException {
+        return MessageDigest.getInstance("MD5");
+    }
+
+    private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
+        server.start();
+        final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+        return address;
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80b7a3a..76e771f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
   <modules>
     <module>httpcore5</module>
     <module>httpcore5-h2</module>
+    <module>httpcore5-reactive</module>
     <module>httpcore5-osgi</module>
     <module>httpcore5-testing</module>
   </modules>
@@ -258,7 +259,7 @@
             <exclude>**/.checkstyle</exclude>
             <exclude>**/.pmd</exclude>
             <exclude>**/*.iml</exclude>
-            <exclude>**/.externalToolBuilders/**</exclude>         
+            <exclude>**/.externalToolBuilders/**</exclude>
             <exclude>maven-eclipse.xml</exclude>
             <exclude>src/docbkx/resources/**</exclude>
             <exclude>src/test/resources/*.truststore</exclude>