You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/08/31 15:12:15 UTC
httpcomponents-core git commit: HTTPCLIENT-1942: Add support for
Reactive Streams
Repository: httpcomponents-core
Updated Branches:
refs/heads/reactive [created] d3f7608c8
HTTPCLIENT-1942: Add support for Reactive Streams
This commit adds initial support for the Reactive Streams specification
[1]. The main part of this change is a pair of non-public classes, the
ReactiveDataProducer and ReactiveDataConsumer, which are reactive
adapters for Apache's AsyncDataProducer and AsyncDataConsumer
interfaces. Two public classes are built on top of these types:
* ReactiveEntityProducer: An AsyncEntityProducer implementation backed
by ReactiveDataProducer that allows a Publisher<ByteBuffer> to be
streamed as a request body
* ReactiveResponseConsumer: An AsyncResponseConsumer that exposes a
special callback that provides a view of the streaming HTTP response
as a Message<HttpResponse, Publisher<ByteBuffer>>, allowing the body
to be consumed by a Subscriber<ByteBuffer>
[1] http://www.reactive-streams.org/
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/d3f7608c
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/d3f7608c
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/d3f7608c
Branch: refs/heads/reactive
Commit: d3f7608c812bf7d0f9538b2234ce57624e45f3b3
Parents: 82eb044
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Fri Aug 17 14:16:04 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Aug 31 17:03:58 2018 +0200
----------------------------------------------------------------------
httpcore5-reactive/pom.xml | 94 +++++
.../hc/core5/reactive/ReactiveDataConsumer.java | 177 ++++++++
.../hc/core5/reactive/ReactiveDataProducer.java | 170 ++++++++
.../core5/reactive/ReactiveEntityProducer.java | 123 ++++++
.../reactive/ReactiveResponseConsumer.java | 176 ++++++++
.../core5/reactive/BasicDataStreamChannel.java | 77 ++++
.../reactive/TestReactiveDataConsumer.java | 208 ++++++++++
.../reactive/TestReactiveDataProducer.java | 89 +++++
.../core5/reactive/WritableByteChannelMock.java | 125 ++++++
httpcore5-testing/pom.xml | 12 +
.../testing/reactive/ReactiveClientTest.java | 400 +++++++++++++++++++
pom.xml | 3 +-
12 files changed, 1653 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/pom.xml b/httpcore5-reactive/pom.xml
new file mode 100644
index 0000000..7babffb
--- /dev/null
+++ b/httpcore5-reactive/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ====================================================================
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ ====================================================================
+
+ This software consists of voluntary contributions made by many
+ individuals on behalf of the Apache Software Foundation. For more
+ information on the Apache Software Foundation, please see
+ <http://www.apache.org />.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>httpcore5-parent</artifactId>
+ <groupId>org.apache.httpcomponents.core5</groupId>
+ <version>5.0-beta4-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>httpcore5-reactive</artifactId>
+ <name>Apache HttpComponents Core Reactive Extensions</name>
+ <description>Apache HttpComponents Reactive Streams Bindings</description>
+ <url>http://hc.apache.org/httpcomponents-core-ga</url>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.httpcomponents.core5</groupId>
+ <artifactId>httpcore5</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ <version>1.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.1.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents.core5</groupId>
+ <artifactId>httpcore5-h2</artifactId>
+ <version>5.0-beta3-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <reporting>
+ <plugins>
+
+ <plugin>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <inherited>false</inherited>
+ <reportSets>
+ <reportSet>
+ <reports>
+ <report>dependencies</report>
+ <report>dependency-info</report>
+ <report>summary</report>
+ </reports>
+ </reportSet>
+ </reportSets>
+ </plugin>
+
+ </plugins>
+ </reporting>
+
+</project>
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
new file mode 100644
index 0000000..9789a43
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -0,0 +1,177 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An asynchronous data consumer that supports Reactive Streams.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuffer> {
+
+ static final int MAX_BUFFER = 1024 * 1024;
+
+ private final AtomicLong requests = new AtomicLong(0);
+ private final AtomicInteger remainingBufferSpace = new AtomicInteger(MAX_BUFFER);
+
+ private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
+ private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+ private volatile boolean cancelled = false;
+ private volatile boolean completed = false;
+ private volatile Exception exception;
+ private volatile CapacityChannel capacityChannel;
+ private volatile Subscriber<? super ByteBuffer> subscriber;
+
+ public void failed(final Exception cause) {
+ exception = cause;
+ flushToSubscriber();
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ throwIfCancelled();
+ this.capacityChannel = capacityChannel;
+ }
+
+ private void throwIfCancelled() throws IOException {
+ if (cancelled) {
+ throw new H2StreamResetException(H2Error.NO_ERROR,
+ "Downstream subscriber to ReactiveDataConsumer cancelled");
+ }
+ }
+
+ @Override
+ public int consume(final ByteBuffer byteBuffer) throws IOException {
+ if (completed) {
+ throw new IllegalStateException("Received data past end of stream");
+ }
+ throwIfCancelled();
+
+ final byte[] copy = new byte[byteBuffer.remaining()];
+ byteBuffer.get(copy);
+ remainingBufferSpace.addAndGet(-copy.length);
+ buffers.add(ByteBuffer.wrap(copy));
+
+ flushToSubscriber();
+ return remainingBufferSpace.get();
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) {
+ completed = true;
+ flushToSubscriber();
+ }
+
+ @Override
+ public void releaseResources() {
+ this.capacityChannel = null;
+ }
+
+ private void flushToSubscriber() {
+ final Subscriber<? super ByteBuffer> s = subscriber;
+ if (flushInProgress.getAndSet(true)) {
+ return;
+ }
+ try {
+ if (s == null) {
+ return;
+ }
+ if (exception != null) {
+ subscriber = null;
+ s.onError(exception);
+ return;
+ }
+ int windowScalingIncrement = 0;
+ ByteBuffer next;
+ while (requests.get() > 0 && ((next = buffers.poll()) != null)) {
+ final int bytesFreed = next.remaining();
+ remainingBufferSpace.addAndGet(bytesFreed);
+ s.onNext(next);
+ requests.decrementAndGet();
+ windowScalingIncrement += bytesFreed;
+ }
+ if (capacityChannel != null && windowScalingIncrement > 0) {
+ try {
+ capacityChannel.update(windowScalingIncrement);
+ } catch (final IOException ex) {
+ failed(ex);
+ return;
+ }
+ }
+ if (completed && buffers.isEmpty()) {
+ subscriber = null;
+ s.onComplete();
+ }
+ } finally {
+ flushInProgress.set(false);
+ }
+ }
+
+ @Override
+ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
+ this.subscriber = Args.notNull(subscriber, "subscriber");
+ subscriber.onSubscribe(new Subscription() {
+ @Override
+ public void request(final long increment) {
+ if (increment <= 0) {
+ failed(new IllegalArgumentException("The number of elements requested must be strictly positive"));
+ return;
+ }
+ requests.addAndGet(increment);
+ flushToSubscriber();
+ }
+
+ @Override
+ public void cancel() {
+ ReactiveDataConsumer.this.cancelled = true;
+ ReactiveDataConsumer.this.subscriber = null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
new file mode 100644
index 0000000..33d7773
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -0,0 +1,170 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.AsyncDataProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * An asynchronous data producer that supports Reactive Streams.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBuffer> {
+
+ private static final int BUFFER_WINDOW_SIZE = 5;
+
+ private final AtomicReference<DataStreamChannel> requestChannel = new AtomicReference<>();
+ private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
+ private final AtomicBoolean complete = new AtomicBoolean(false);
+ private final Publisher<ByteBuffer> publisher;
+ private final AtomicReference<Subscription> subscription = new AtomicReference<>(null);
+ private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization
+
+ public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
+ this.publisher = Args.notNull(publisher, "publisher");
+ }
+
+ @Override
+ public void onSubscribe(final Subscription subscription) {
+ if (this.subscription.getAndSet(subscription) != null) {
+ throw new IllegalStateException("Already subscribed");
+ }
+
+ subscription.request(BUFFER_WINDOW_SIZE);
+ }
+
+ @Override
+ public void onNext(final ByteBuffer byteBuffer) {
+ final byte[] copy = new byte[byteBuffer.remaining()];
+ byteBuffer.get(copy);
+ synchronized (buffers) {
+ buffers.add(ByteBuffer.wrap(copy));
+ }
+ signalReadiness();
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ subscription.set(null);
+ exception.set(throwable);
+ signalReadiness();
+ }
+
+ @Override
+ public void onComplete() {
+ subscription.set(null);
+ complete.set(true);
+ signalReadiness();
+ }
+
+ private void signalReadiness() {
+ final DataStreamChannel channel = requestChannel.get();
+ if (channel == null) {
+ throw new IllegalStateException("Output channel is not set");
+ }
+ channel.requestOutput();
+ }
+
+ @Override
+ public int available() {
+ if (exception.get() != null || complete.get()) {
+ return 1;
+ } else {
+ synchronized (buffers) {
+ int sum = 0;
+ for (final ByteBuffer buffer : buffers) {
+ sum += buffer.remaining();
+ }
+ return sum;
+ }
+ }
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ if (requestChannel.get() == null) {
+ requestChannel.set(channel);
+ publisher.subscribe(this);
+ }
+
+ final Throwable t = exception.get();
+ final Subscription s = subscription.get();
+ int buffersToReplenish = 0;
+ try {
+ synchronized (buffers) {
+ if (t != null) {
+ final H2StreamResetException ex = new H2StreamResetException(H2Error.NO_ERROR,
+ "Request publisher threw an exception");
+ ex.initCause(t);
+ throw ex;
+ } else if (this.complete.get() && buffers.isEmpty()) {
+ channel.endStream();
+ } else {
+ while (buffers.size() > 0) {
+ final ByteBuffer nextBuffer = buffers.remove();
+ channel.write(nextBuffer);
+ if (nextBuffer.remaining() > 0) {
+ buffers.push(nextBuffer);
+ break;
+ } else if (s != null) {
+ // We defer the #request call until after we release the buffer lock.
+ buffersToReplenish++;
+ }
+ }
+ }
+ }
+ } finally {
+ if (s != null && buffersToReplenish > 0) {
+ s.request(buffersToReplenish);
+ }
+ }
+ }
+
+ @Override
+ public void releaseResources() {
+ final Subscription s = subscription.getAndSet(null);
+ if (s != null) {
+ s.cancel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
new file mode 100644
index 0000000..2fc4a98
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveEntityProducer.java
@@ -0,0 +1,123 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+/**
+ * An {@link AsyncEntityProducer} that subscribes to a {@code Publisher}
+ * instance, as defined by the Reactive Streams specification.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class ReactiveEntityProducer implements AsyncEntityProducer {
+
+ private final ReactiveDataProducer reactiveDataProducer;
+
+ private final long contentLength;
+ private final String contentType;
+ private final String contentEncoding;
+
+ /**
+ * Creates a new {@code ReactiveEntityProducer} with the given parameters.
+ *
+ * @param publisher the publisher of the entity stream.
+ * @param contentLength the length of the entity, or -1 if unknown (implies chunked encoding).
+ * @param contentType the {@code Content-Type} of the entity, or null if none.
+ * @param contentEncoding the {@code Content-Encoding} of the entity, or null if none.
+ */
+ public ReactiveEntityProducer(
+ final Publisher<ByteBuffer> publisher,
+ final long contentLength,
+ final String contentType,
+ final String contentEncoding
+ ) {
+ this.reactiveDataProducer = new ReactiveDataProducer(publisher);
+ this.contentLength = contentLength;
+ this.contentType = contentType;
+ this.contentEncoding = contentEncoding;
+ }
+
+ @Override
+ public int available() {
+ return reactiveDataProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ reactiveDataProducer.produce(channel);
+ }
+
+ @Override
+ public void releaseResources() {
+ reactiveDataProducer.releaseResources();
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ releaseResources();
+ }
+
+ @Override
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public String getContentType() {
+ return contentType;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return contentEncoding;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return contentLength == -1;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
new file mode 100644
index 0000000..d0d746f
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java
@@ -0,0 +1,176 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.util.Args;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+/**
+ * An {@link AsyncResponseConsumer} that publishes the response body through
+ * a {@link Publisher}, as defined by the Reactive Streams specification. The
+ * response is represented as a {@link Message} consisting of a {@link
+ * HttpResponse} representing the headers and a {@link Publisher} representing
+ * the response body as an asynchronous stream of {@link ByteBuffer} instances.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.SAFE)
+public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Void> {
+
+ private final ReactiveDataConsumer reactiveDataConsumer = new ReactiveDataConsumer();
+ private final List<Header> trailers = Collections.synchronizedList(new ArrayList<Header>());
+ private final BasicFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseFuture;
+
+ private volatile BasicFuture<Void> responseCompletion;
+ private volatile HttpResponse informationResponse;
+ private volatile EntityDetails entityDetails;
+
+ /**
+ * Creates a {@code ReactiveResponseConsumer}.
+ */
+ public ReactiveResponseConsumer() {
+ this.responseFuture = new BasicFuture<>(null);
+ }
+
+ /**
+ * Creates a {@code ReactiveResponseConsumer} that will call back the supplied {@link FutureCallback} with a
+ * streamable response.
+ *
+ * @param responseCallback the callback to invoke when the response is available for consumption.
+ */
+ public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publisher<ByteBuffer>>> responseCallback) {
+ this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
+ }
+
+ public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
+ return responseFuture;
+ }
+
+ /**
+ * Returns the intermediate (1xx) HTTP response if one was received.
+ *
+ * @return the information response, or {@code null} if none.
+ */
+ public HttpResponse getInformationResponse() {
+ return informationResponse;
+ }
+
+ /**
+ * Returns the response entity details.
+ *
+ * @return the entity details, or {@code null} if none.
+ */
+ public EntityDetails getEntityDetails() {
+ return entityDetails;
+ }
+
+ /**
+ * Returns the trailers received at the end of the response.
+ *
+ * @return a non-null list of zero or more trailers.
+ */
+ public List<Header> getTrailers() {
+ return trailers;
+ }
+
+ @Override
+ public void consumeResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails,
+ final HttpContext httpContext,
+ final FutureCallback<Void> resultCallback
+ ) {
+ this.entityDetails = entityDetails;
+ this.responseCompletion = new BasicFuture<>(resultCallback);
+ this.responseFuture.completed(new Message<HttpResponse, Publisher<ByteBuffer>>(response, reactiveDataConsumer));
+ }
+
+ @Override
+ public void informationResponse(final HttpResponse response, final HttpContext httpContext) {
+ this.informationResponse = response;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ reactiveDataConsumer.failed(cause);
+ responseFuture.failed(cause);
+ if (responseCompletion != null) {
+ responseCompletion.failed(cause);
+ }
+ }
+
+ @Override
+ public Void getResult() {
+ return null;
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ reactiveDataConsumer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return reactiveDataConsumer.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) {
+ if (trailers != null) {
+ this.trailers.addAll(trailers);
+ }
+ reactiveDataConsumer.streamEnd(trailers);
+ responseCompletion.completed(null);
+ }
+
+ @Override
+ public void releaseResources() {
+ reactiveDataConsumer.releaseResources();
+ responseFuture.cancel();
+ if (responseCompletion != null) {
+ responseCompletion.cancel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
new file mode 100644
index 0000000..6510f04
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/BasicDataStreamChannel.java
@@ -0,0 +1,77 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BasicDataStreamChannel implements DataStreamChannel {
+
+ private final WritableByteChannel byteChannel;
+ private List<Header> trailers;
+
+ public BasicDataStreamChannel(final WritableByteChannel byteChannel) {
+ this.byteChannel = byteChannel;
+ }
+
+ @Override
+ public void requestOutput() {
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ return byteChannel.write(src);
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ if (byteChannel.isOpen()) {
+ byteChannel.close();
+ }
+ }
+
+ @Override
+ public void endStream(final List<? extends Header> trailers) throws IOException {
+ endStream();
+ if (trailers != null) {
+ this.trailers = new ArrayList<>();
+ this.trailers.addAll(trailers);
+ }
+ }
+
+ public List<Header> getTrailers() {
+ return trailers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
new file mode 100644
index 0000000..bd380fe
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
@@ -0,0 +1,208 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import io.reactivex.Flowable;
+import io.reactivex.Notification;
+import io.reactivex.Observable;
+import io.reactivex.Single;
+import io.reactivex.functions.Consumer;
+
+public class TestReactiveDataConsumer {
+ @Test
+ public void testStreamThatEndsNormally() throws Exception {
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+
+ final List<ByteBuffer> output = Collections.synchronizedList(new ArrayList<ByteBuffer>());
+
+ final CountDownLatch complete = new CountDownLatch(1);
+ Observable.fromPublisher(consumer)
+ .materialize()
+ .forEach(new Consumer<Notification<ByteBuffer>>() {
+ @Override
+ public void accept(final Notification<ByteBuffer> byteBufferNotification) throws Exception {
+ if (byteBufferNotification.isOnComplete()) {
+ complete.countDown();
+ } else if (byteBufferNotification.isOnNext()) {
+ output.add(byteBufferNotification.getValue());
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+ });
+
+ consumer.consume(ByteBuffer.wrap(new byte[]{ '1' }));
+ consumer.consume(ByteBuffer.wrap(new byte[]{ '2' }));
+ consumer.consume(ByteBuffer.wrap(new byte[]{ '3' }));
+ consumer.streamEnd(null);
+
+ Assert.assertTrue("Stream did not finish before timeout", complete.await(1, TimeUnit.SECONDS));
+ Assert.assertEquals(3, output.size());
+ Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '1' }), output.get(0));
+ Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '2' }), output.get(1));
+ Assert.assertEquals(ByteBuffer.wrap(new byte[]{ '3' }), output.get(2));
+ }
+
+ @Test
+ public void testStreamThatEndsWithError() {
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+ final Single<List<Notification<ByteBuffer>>> single = Observable.fromPublisher(consumer)
+ .materialize()
+ .toList();
+
+ final Exception ex = new RuntimeException();
+ consumer.failed(ex);
+
+ Assert.assertSame(ex, single.blockingGet().get(0).getError());
+ }
+
+ @Test(expected = H2StreamResetException.class)
+ public void testCancellation() throws Exception {
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+ consumer.subscribe(new Subscriber<ByteBuffer>() {
+ @Override
+ public void onSubscribe(final Subscription s) {
+ s.cancel();
+ }
+
+ @Override
+ public void onNext(final ByteBuffer byteBuffer) {
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ consumer.consume(ByteBuffer.wrap(new byte[1024]));
+ }
+
+ @Test
+ public void testCapacityIncrements() throws Exception {
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+ final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+ final AtomicInteger lastIncrement = new AtomicInteger(-1);
+ final CapacityChannel channel = new CapacityChannel() {
+ @Override
+ public void update(final int increment) {
+ lastIncrement.set(increment);
+ }
+ };
+ consumer.updateCapacity(channel);
+ Assert.assertEquals("CapacityChannel#update should not have been invoked yet", -1, lastIncrement.get());
+
+ final AtomicInteger received = new AtomicInteger(0);
+ final AtomicReference<Subscription> subscription = new AtomicReference<>();
+ consumer.subscribe(new Subscriber<ByteBuffer>() {
+ @Override
+ public void onSubscribe(final Subscription s) {
+ subscription.set(s);
+ }
+
+ @Override
+ public void onNext(final ByteBuffer byteBuffer) {
+ received.incrementAndGet();
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (1 * 1024), consumer.consume(data.duplicate()));
+ Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (2 * 1024), consumer.consume(data.duplicate()));
+ Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (3 * 1024), consumer.consume(data.duplicate()));
+ Assert.assertEquals(ReactiveDataConsumer.MAX_BUFFER - (4 * 1024), consumer.consume(data.duplicate()));
+
+ subscription.get().request(1);
+ Assert.assertEquals(1024, lastIncrement.get());
+
+ subscription.get().request(2);
+ Assert.assertEquals(2 * 1024, lastIncrement.get());
+
+ subscription.get().request(99);
+ Assert.assertEquals(1024, lastIncrement.get());
+ }
+
+ @Test
+ public void testFullResponseBuffering() throws Exception {
+ // Due to inherent race conditions, is possible for the entire response to be buffered and completed before
+ // the Subscriber shows up. This must be handled correctly.
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+ final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+ consumer.consume(data.duplicate());
+ consumer.consume(data.duplicate());
+ consumer.consume(data.duplicate());
+ consumer.streamEnd(null);
+
+ Assert.assertEquals(Flowable.fromPublisher(consumer).count().blockingGet().longValue(), 3L);
+ }
+
+ @Test
+ public void testErrorBuffering() throws Exception {
+ final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
+ final ByteBuffer data = ByteBuffer.wrap(new byte[1024]);
+
+ final RuntimeException ex = new RuntimeException();
+ consumer.consume(data.duplicate());
+ consumer.consume(data.duplicate());
+ consumer.consume(data.duplicate());
+ consumer.failed(ex);
+
+ final Notification<ByteBuffer> result = Flowable.fromPublisher(consumer)
+ .materialize()
+ .singleOrError()
+ .blockingGet();
+ Assert.assertSame(ex, result.getError());
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
new file mode 100644
index 0000000..1117439
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
@@ -0,0 +1,89 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.reactivex.Flowable;
+
+public class TestReactiveDataProducer {
+ @Test
+ public void testStreamThatEndsNormally() throws Exception {
+ final Flowable<ByteBuffer> publisher = Flowable.just(
+ ByteBuffer.wrap(new byte[]{ '1', '2', '3' }),
+ ByteBuffer.wrap(new byte[]{ '4', '5', '6' }));
+ final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
+
+ final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
+ final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
+
+ producer.produce(streamChannel);
+
+ Assert.assertTrue(byteChannel.isOpen());
+ Assert.assertEquals("123456", byteChannel.dump(StandardCharsets.US_ASCII));
+
+ producer.produce(streamChannel);
+
+ Assert.assertFalse(byteChannel.isOpen());
+ Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+ }
+
+ @Test
+ public void testStreamThatEndsWithError() throws Exception {
+ final Flowable<ByteBuffer> publisher = Flowable.concatArray(
+ Flowable.just(
+ ByteBuffer.wrap(new byte[]{ '1' }),
+ ByteBuffer.wrap(new byte[]{ '2' }),
+ ByteBuffer.wrap(new byte[]{ '3' }),
+ ByteBuffer.wrap(new byte[]{ '4' }),
+ ByteBuffer.wrap(new byte[]{ '5' }),
+ ByteBuffer.wrap(new byte[]{ '6' })),
+ Flowable.<ByteBuffer>error(new RuntimeException())
+ );
+ final ReactiveDataProducer producer = new ReactiveDataProducer(publisher);
+
+ final WritableByteChannelMock byteChannel = new WritableByteChannelMock(1024);
+ final DataStreamChannel streamChannel = new BasicDataStreamChannel(byteChannel);
+
+ producer.produce(streamChannel);
+ Assert.assertEquals("12345", byteChannel.dump(StandardCharsets.US_ASCII));
+
+ try {
+ producer.produce(streamChannel);
+ Assert.fail("Expected ProtocolException");
+ } catch (final H2StreamResetException ex) {
+ Assert.assertTrue("Expected published exception to be rethrown", ex.getCause() instanceof RuntimeException);
+ Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
new file mode 100644
index 0000000..679181d
--- /dev/null
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/WritableByteChannelMock.java
@@ -0,0 +1,125 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+public class WritableByteChannelMock implements WritableByteChannel {
+
+ private final int capacityLimit;
+
+ private int capacityUsed;
+ private ByteBuffer buf;
+ private boolean closed;
+
+ public WritableByteChannelMock(final int initialSize, final int capacityLimit) {
+ this.buf = ByteBuffer.allocate(initialSize);
+ this.capacityLimit = capacityLimit;
+ }
+
+ public WritableByteChannelMock(final int initialSize) {
+ this(initialSize, 0);
+ }
+
+ private void expandCapacity(final int capacity) {
+ final ByteBuffer oldbuffer = this.buf;
+ this.buf = ByteBuffer.allocate(capacity);
+ oldbuffer.flip();
+ this.buf.put(oldbuffer);
+ }
+
+ private void ensureCapacity(final int requiredCapacity) {
+ if (requiredCapacity > this.buf.capacity()) {
+ expandCapacity(requiredCapacity);
+ }
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ if (this.closed) {
+ throw new ClosedChannelException();
+ }
+ final int len = src.remaining();
+ ensureCapacity(this.buf.position() + len);
+ if (this.capacityLimit > 0) {
+ final int chunk = Math.min(this.capacityLimit - this.capacityUsed, len);
+ if (chunk > 0) {
+ final int limit = src.limit();
+ src.limit(src.position() + chunk);
+ this.buf.put(src);
+ src.limit(limit);
+ this.capacityUsed += chunk;
+ return chunk;
+ }
+ return 0;
+ }
+ this.buf.put(src);
+ return len;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !this.closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+ }
+
+ public void flush() {
+ this.capacityUsed = 0;
+ }
+
+ public void reset() {
+ this.capacityUsed = 0;
+ this.buf.clear();
+ }
+
+ public byte[] toByteArray() {
+ final ByteBuffer dup = this.buf.duplicate();
+ dup.flip();
+ final byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+ return bytes;
+ }
+
+ public String dump(final Charset charset) throws CharacterCodingException {
+ this.buf.flip();
+ final CharBuffer charBuffer = charset.newDecoder().decode(this.buf);
+ this.buf.compact();
+ return charBuffer.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-testing/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml
index 7cf41d7..e9e1f1c 100644
--- a/httpcore5-testing/pom.xml
+++ b/httpcore5-testing/pom.xml
@@ -50,6 +50,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents.core5</groupId>
+ <artifactId>httpcore5-reactive</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
@@ -73,6 +79,12 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>2.1.9</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
new file mode 100644
index 0000000..21c4be4
--- /dev/null
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -0,0 +1,400 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing.reactive;
+
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2StreamResetException;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactor.ExceptionEvent;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
+import org.apache.hc.core5.testing.nio.EchoHandler;
+import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
+import org.apache.hc.core5.testing.nio.LoggingHttp2StreamListener;
+import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
+import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
+import org.apache.hc.core5.util.Timeout;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+@RunWith(Parameterized.class)
+public class ReactiveClientTest {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> protocols() {
+ return Arrays.asList(new Object[][]{
+ { HttpVersionPolicy.FORCE_HTTP_1 },
+ { HttpVersionPolicy.FORCE_HTTP_2 }
+ });
+ }
+ private static final Timeout TIMEOUT = Timeout.ofSeconds(3000);
+ private static final Random RANDOM = new Random();
+
+ private final HttpVersionPolicy versionPolicy;
+
+ public ReactiveClientTest(final HttpVersionPolicy httpVersionPolicy) {
+ this.versionPolicy = httpVersionPolicy;
+ }
+
+ private HttpAsyncServer server;
+
+ @Rule
+ public ExternalResource serverResource = new ExternalResource() {
+
+ @Override
+ protected void before() throws Throwable {
+ log.debug("Starting up test server");
+ server = H2ServerBootstrap.bootstrap()
+ .setVersionPolicy(versionPolicy)
+ .setIOReactorConfig(
+ IOReactorConfig.custom()
+ .setSoTimeout(TIMEOUT)
+ .build())
+ .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
+ .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
+ .setStreamListener(LoggingHttp2StreamListener.INSTANCE)
+ .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
+ .register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new EchoHandler(10 * 1024 * 1024);
+ }
+
+ })
+ .create();
+ }
+
+ @Override
+ protected void after() {
+ log.debug("Shutting down test server");
+ if (server != null) {
+ try {
+ server.close(CloseMode.GRACEFUL);
+ final List<ExceptionEvent> exceptionLog = server.getExceptionLog();
+ server = null;
+ if (!exceptionLog.isEmpty()) {
+ for (final ExceptionEvent event: exceptionLog) {
+ final Throwable cause = event.getCause();
+ log.error("Unexpected " + cause.getClass() + " at " + event.getTimestamp(), cause);
+ }
+ }
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+
+ };
+
+ private HttpAsyncRequester requester;
+
+ @Rule
+ public ExternalResource clientResource = new ExternalResource() {
+
+ @Override
+ protected void before() throws Throwable {
+ log.debug("Starting up test client");
+ requester = H2RequesterBootstrap.bootstrap()
+ .setVersionPolicy(versionPolicy)
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setSoTimeout(TIMEOUT)
+ .build())
+ .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
+ .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
+ .setStreamListener(LoggingHttp2StreamListener.INSTANCE)
+ .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
+ .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
+ .create();
+ }
+
+ @Override
+ protected void after() {
+ log.debug("Shutting down test client");
+ if (requester != null) {
+ try {
+ requester.close(CloseMode.GRACEFUL);
+ final List<ExceptionEvent> exceptionLog = requester.getExceptionLog();
+ requester = null;
+ if (!exceptionLog.isEmpty()) {
+ for (final ExceptionEvent event: exceptionLog) {
+ final Throwable cause = event.getCause();
+ log.error("Unexpected " + cause.getClass() + " at " + event.getTimestamp(), cause);
+ }
+ }
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+
+ };
+
+ @Test(timeout = 5_000)
+ public void testSimpleRequest() throws Exception {
+ final InetSocketAddress address = startClientAndServer();
+ final byte[] input = new byte[1024];
+ RANDOM.nextBytes(input);
+ final Publisher<ByteBuffer> publisher = Flowable.just(ByteBuffer.wrap(input));
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, input.length, null, null);
+
+ final BasicRequestProducer request = getRequestProducer(address, producer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+ requester.execute(request, consumer, Timeout.ofSeconds(2), null);
+
+ final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+ final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ final WritableByteChannel writableByteChannel = Channels.newChannel(byteArrayOutputStream);
+ for (final ByteBuffer byteBuffer : Observable.fromPublisher(response.getBody()).toList().blockingGet()) {
+ writableByteChannel.write(byteBuffer);
+ }
+ writableByteChannel.close();
+ final byte[] output = byteArrayOutputStream.toByteArray();
+ Assert.assertArrayEquals(input, output);
+ }
+
+ private BasicRequestProducer getRequestProducer(final InetSocketAddress address, final ReactiveEntityProducer producer) {
+ return new BasicRequestProducer("POST",
+ URI.create("http://localhost:" + address.getPort()), producer);
+ }
+
+ @Test(timeout = 20_000)
+ public void testLongRunningRequest() throws Exception {
+ final InetSocketAddress address = startClientAndServer();
+ final AtomicLong requestLength = new AtomicLong(0L);
+ final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
+ final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 500)
+ .map(new Function<Long, ByteBuffer>() {
+ @Override
+ public ByteBuffer apply(final Long seed) {
+ final Random random = new Random(seed);
+ final byte[] bytes = new byte[1 + random.nextInt(32 * 1024)];
+ requestLength.addAndGet(bytes.length);
+ random.nextBytes(bytes);
+ return ByteBuffer.wrap(bytes);
+ }
+ })
+ .doOnNext(new Consumer<ByteBuffer>() {
+ @Override
+ public void accept(final ByteBuffer byteBuffer) {
+ requestDigest.get().update(byteBuffer.duplicate());
+ }
+ });
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final BasicRequestProducer request = getRequestProducer(address, producer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+ requester.execute(request, consumer, Timeout.ofSeconds(2), null);
+ final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+ final AtomicLong responseLength = new AtomicLong(0);
+ final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
+ Flowable.fromPublisher(response.getBody())
+ .blockingForEach(new Consumer<ByteBuffer>() {
+ @Override
+ public void accept(final ByteBuffer byteBuffer) {
+ responseLength.addAndGet(byteBuffer.remaining());
+ responseDigest.get().update(byteBuffer);
+ }
+ });
+ Assert.assertEquals(requestLength.get(), responseLength.get());
+ Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+ }
+
+ @Test(timeout = 5_000)
+ public void testRequestError() throws Exception {
+ final InetSocketAddress address = startClientAndServer();
+ final RuntimeException exceptionThrown = new RuntimeException("Test");
+ final Publisher<ByteBuffer> publisher = Flowable.error(exceptionThrown);
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, 100, null, null);
+
+ final BasicRequestProducer request = getRequestProducer(address, producer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+
+ final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+
+ try {
+ future.get();
+ Assert.fail("Expected exception");
+ } catch (final ExecutionException ex) {
+ Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+ Assert.assertSame(exceptionThrown, ex.getCause().getCause());
+ }
+ }
+
+ @Test(timeout = 5_000)
+ public void testRequestTimeout() throws Exception {
+ final InetSocketAddress address = startClientAndServer();
+ final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
+ final Publisher<ByteBuffer> publisher = Flowable.<ByteBuffer>never()
+ .doOnCancel(new Action() {
+ @Override
+ public void run() {
+ requestPublisherWasCancelled.set(true);
+ }
+ });
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final BasicRequestProducer request = getRequestProducer(address, producer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+ final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+
+ try {
+ future.get();
+ } catch (final ExecutionException ex) {
+ Assert.assertTrue(requestPublisherWasCancelled.get());
+ final Throwable cause = ex.getCause();
+ if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_1) {
+ Assert.assertTrue("Expected SocketTimeoutException, but got " + cause.getClass().getName(),
+ cause instanceof SocketTimeoutException);
+ } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
+ Assert.assertTrue(String.format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
+ cause instanceof H2StreamResetException);
+ Assert.assertEquals(H2Error.NO_ERROR.getCode(), ((H2StreamResetException) cause).getCode());
+ } else {
+ Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
+ }
+ }
+ }
+
+ @Test(timeout = 5_000)
+ public void testResponseCancellation() throws Exception {
+ final InetSocketAddress address = startClientAndServer();
+ final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
+ final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
+ final Publisher<ByteBuffer> publisher = Flowable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE)
+ .map(new Function<Long, ByteBuffer>() {
+ @Override
+ public ByteBuffer apply(final Long seed) throws Exception {
+ final Random random = new Random(seed);
+ final byte[] bytes = new byte[1 + random.nextInt(1024)];
+ random.nextBytes(bytes);
+ return ByteBuffer.wrap(bytes);
+ }
+ })
+ .doOnCancel(new Action() {
+ @Override
+ public void run() throws Exception {
+ requestPublisherWasCancelled.set(true);
+ }
+ })
+ .doOnError(new Consumer<Throwable>() {
+ @Override
+ public void accept(final Throwable throwable) throws Exception {
+ requestStreamError.set(throwable);
+ }
+ });
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final BasicRequestProducer request = getRequestProducer(address, producer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+ final Future<Void> future = requester.execute(request, consumer, Timeout.ofSeconds(1), null);
+ final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
+
+ final AtomicBoolean responsePublisherWasCancelled = new AtomicBoolean(false);
+ final List<ByteBuffer> outputBuffers = Flowable.fromPublisher(response.getBody())
+ .doOnCancel(new Action() {
+ @Override
+ public void run() throws Exception {
+ responsePublisherWasCancelled.set(true);
+ }
+ })
+ .take(3)
+ .toList()
+ .blockingGet();
+ Assert.assertEquals(3, outputBuffers.size());
+ Assert.assertTrue("The response subscription should have been cancelled", responsePublisherWasCancelled.get());
+ try {
+ future.get();
+ Assert.fail("Expected exception");
+ } catch (final ExecutionException | CancellationException ex) {
+ Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+ Assert.assertTrue(requestPublisherWasCancelled.get());
+ Assert.assertNull(requestStreamError.get());
+ }
+ }
+
+ private static MessageDigest newDigest() throws NoSuchAlgorithmException {
+ return MessageDigest.getInstance("MD5");
+ }
+
+ private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
+ server.start();
+ final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d3f7608c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80b7a3a..76e771f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
<modules>
<module>httpcore5</module>
<module>httpcore5-h2</module>
+ <module>httpcore5-reactive</module>
<module>httpcore5-osgi</module>
<module>httpcore5-testing</module>
</modules>
@@ -258,7 +259,7 @@
<exclude>**/.checkstyle</exclude>
<exclude>**/.pmd</exclude>
<exclude>**/*.iml</exclude>
- <exclude>**/.externalToolBuilders/**</exclude>
+ <exclude>**/.externalToolBuilders/**</exclude>
<exclude>maven-eclipse.xml</exclude>
<exclude>src/docbkx/resources/**</exclude>
<exclude>src/test/resources/*.truststore</exclude>