You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/09 13:53:56 UTC
httpcomponents-core git commit: HTTPCORE-551: Reactive Streams server
side APIs [Forced Update!]
Repository: httpcomponents-core
Updated Branches:
refs/heads/master 3925e5b17 -> 98caba56e (forced update)
HTTPCORE-551: Reactive Streams server side APIs
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/98caba56
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/98caba56
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/98caba56
Branch: refs/heads/master
Commit: 98caba56e15a1c8fe28bb269c48f9092bcfd9e45
Parents: b4d73bf
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Sat Sep 1 12:58:12 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sun Sep 9 15:53:09 2018 +0200
----------------------------------------------------------------------
.../hc/core5/reactive/ReactiveDataProducer.java | 4 +
.../reactive/ReactiveRequestProcessor.java | 65 +++++++++
.../reactive/ReactiveServerExchangeHandler.java | 141 +++++++++++++++++++
.../testing/reactive/ReactiveClientTest.java | 40 +++++-
4 files changed, 247 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
index f917a11..890f182 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -63,6 +63,10 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBu
this.publisher = Args.notNull(publisher, "publisher");
}
+ void setChannel(final DataStreamChannel channel) {
+ requestChannel.set(channel);
+ }
+
@Override
public void onSubscribe(final Subscription subscription) {
if (this.subscription.getAndSet(subscription) != null) {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
new file mode 100644
index 0000000..e639be9
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * @since 5.0
+ */
+public interface ReactiveRequestProcessor {
+
+ /**
+ * Processes the actual HTTP request. The handler can choose to send
+ * response messages immediately inside the call or asynchronously
+ * at some later point.
+ *
+ * @param request the actual request.
+ * @param entityDetails the request entity details or {@code null} if the request
+ * does not enclose an entity.
+ * @param responseChannel the response channel.
+ * @param context the actual execution context.
+ * @param requestBody a reactive stream representing the request body.
+ * @param responseBodyCallback a callback to invoke with the response body, if any.
+ */
+ void processRequest(
+ HttpRequest request,
+ EntityDetails entityDetails,
+ ResponseChannel responseChannel,
+ HttpContext context,
+ Publisher<ByteBuffer> requestBody,
+ Callback<Publisher<ByteBuffer>> responseBodyCallback) throws HttpException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
new file mode 100644
index 0000000..78f48fd
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
@@ -0,0 +1,141 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An implementation of {@link AsyncServerExchangeHandler} designed to work with reactive streams.
+ *
+ * @since 5.0
+ */
+public final class ReactiveServerExchangeHandler implements AsyncServerExchangeHandler {
+
+ private final ReactiveRequestProcessor requestProcessor;
+ private final AtomicReference<ReactiveDataProducer> responseProducer = new AtomicReference<>(null);
+ private final ReactiveDataConsumer requestConsumer;
+ private volatile DataStreamChannel channel;
+
+ /**
+ * Creates a {@code ReactiveServerExchangeHandler}.
+ *
+ * @param requestProcessor the {@link ReactiveRequestProcessor} instance to
+ * invoke when the request is ready to be handled.
+ */
+ public ReactiveServerExchangeHandler(final ReactiveRequestProcessor requestProcessor) {
+ this.requestProcessor = requestProcessor;
+ this.requestConsumer = new ReactiveDataConsumer();
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context
+ ) throws HttpException, IOException {
+ final Callback<Publisher<ByteBuffer>> callback = new Callback<Publisher<ByteBuffer>>() {
+ @Override
+ public void execute(final Publisher<ByteBuffer> result) {
+ final ReactiveDataProducer producer = new ReactiveDataProducer(result);
+ if (channel != null) {
+ producer.setChannel(channel);
+ }
+ responseProducer.set(producer);
+ result.subscribe(producer);
+ }
+ };
+ requestProcessor.processRequest(request, entityDetails, responseChannel, context, requestConsumer, callback);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ requestConsumer.failed(cause);
+ final ReactiveDataProducer p = responseProducer.get();
+ if (p != null) {
+ p.onError(cause);
+ }
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ requestConsumer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return requestConsumer.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ requestConsumer.streamEnd(trailers);
+ }
+
+ @Override
+ public int available() {
+ final ReactiveDataProducer p = responseProducer.get();
+ if (p == null) {
+ return 0;
+ } else {
+ return p.available();
+ }
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ this.channel = channel;
+ final ReactiveDataProducer p = responseProducer.get();
+ if (p != null) {
+ p.produce(channel);
+ }
+ }
+
+ @Override
+ public void releaseResources() {
+ final ReactiveDataProducer p = responseProducer.get();
+ if (p != null) {
+ p.releaseResources();
+ }
+ requestConsumer.releaseResources();
+ }
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index 9b41129..8a398d1 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -27,6 +27,7 @@
package org.apache.hc.core5.testing.reactive;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
@@ -46,25 +47,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
import org.apache.hc.core5.reactor.ExceptionEvent;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
-import org.apache.hc.core5.testing.nio.EchoHandler;
import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
import org.apache.hc.core5.testing.nio.LoggingHttp2StreamListener;
import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
@@ -109,6 +121,28 @@ public class ReactiveClientTest {
private HttpAsyncServer server;
+ private static final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
+ @Override
+ public void processRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context,
+ final Publisher<ByteBuffer> requestBody,
+ final Callback<Publisher<ByteBuffer>> responseBodyFuture
+ ) throws HttpException, IOException {
+ if (new BasicHeader("Expect", "100-continue").equals(request.getSingleHeader("Expect"))) {
+ responseChannel.sendInformation(new BasicHttpResponse(100), context);
+ }
+
+ responseChannel.sendResponse(
+ new BasicHttpResponse(200),
+ new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+ context);
+ responseBodyFuture.execute(requestBody);
+ }
+ }
+
@Rule
public ExternalResource serverResource = new ExternalResource() {
@@ -129,7 +163,7 @@ public class ReactiveClientTest {
@Override
public AsyncServerExchangeHandler get() {
- return new EchoHandler(10 * 1024 * 1024);
+ return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
}
})
@@ -234,7 +268,7 @@ public class ReactiveClientTest {
final InetSocketAddress address = startClientAndServer();
final AtomicLong requestLength = new AtomicLong(0L);
final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
- final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 500)
+ final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 10)
.map(new Function<Long, ByteBuffer>() {
@Override
public ByteBuffer apply(final Long seed) {