You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/09 13:53:56 UTC

httpcomponents-core git commit: HTTPCORE-551: Reactive Streams server side APIs [Forced Update!]

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 3925e5b17 -> 98caba56e (forced update)


HTTPCORE-551: Reactive Streams server side APIs


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/98caba56
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/98caba56
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/98caba56

Branch: refs/heads/master
Commit: 98caba56e15a1c8fe28bb269c48f9092bcfd9e45
Parents: b4d73bf
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Sat Sep 1 12:58:12 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Sun Sep 9 15:53:09 2018 +0200

----------------------------------------------------------------------
 .../hc/core5/reactive/ReactiveDataProducer.java |   4 +
 .../reactive/ReactiveRequestProcessor.java      |  65 +++++++++
 .../reactive/ReactiveServerExchangeHandler.java | 141 +++++++++++++++++++
 .../testing/reactive/ReactiveClientTest.java    |  40 +++++-
 4 files changed, 247 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
index f917a11..890f182 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -63,6 +63,10 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBu
         this.publisher = Args.notNull(publisher, "publisher");
     }
 
+    void setChannel(final DataStreamChannel channel) {
+        requestChannel.set(channel);
+    }
+
     @Override
     public void onSubscribe(final Subscription subscription) {
         if (this.subscription.getAndSet(subscription) != null) {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
new file mode 100644
index 0000000..e639be9
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveRequestProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * @since 5.0
+ */
+public interface ReactiveRequestProcessor {
+
+    /**
+     * Processes the actual HTTP request. The handler can choose to send
+     * response messages immediately inside the call or asynchronously
+     * at some later point.
+     *
+     * @param request the actual request.
+     * @param entityDetails the request entity details or {@code null} if the request
+     *                      does not enclose an entity.
+     * @param responseChannel the response channel.
+     * @param context the actual execution context.
+     * @param requestBody a reactive stream representing the request body.
+     * @param responseBodyCallback a callback to invoke with the response body, if any.
+     */
+    void processRequest(
+            HttpRequest request,
+            EntityDetails entityDetails,
+            ResponseChannel responseChannel,
+            HttpContext context,
+            Publisher<ByteBuffer> requestBody,
+            Callback<Publisher<ByteBuffer>> responseBodyCallback) throws HttpException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
new file mode 100644
index 0000000..78f48fd
--- /dev/null
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveServerExchangeHandler.java
@@ -0,0 +1,141 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.reactivestreams.Publisher;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An implementation of {@link AsyncServerExchangeHandler} designed to work with reactive streams.
+ *
+ * @since 5.0
+ */
+public final class ReactiveServerExchangeHandler implements AsyncServerExchangeHandler {
+
+    private final ReactiveRequestProcessor requestProcessor;
+    private final AtomicReference<ReactiveDataProducer> responseProducer = new AtomicReference<>(null);
+    private final ReactiveDataConsumer requestConsumer;
+    private volatile DataStreamChannel channel;
+
+    /**
+     * Creates a {@code ReactiveServerExchangeHandler}.
+     *
+     * @param requestProcessor the {@link ReactiveRequestProcessor} instance to
+     *                         invoke when the request is ready to be handled.
+     */
+    public ReactiveServerExchangeHandler(final ReactiveRequestProcessor requestProcessor) {
+        this.requestProcessor = requestProcessor;
+        this.requestConsumer = new ReactiveDataConsumer();
+    }
+
+    @Override
+    public void handleRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel,
+            final HttpContext context
+    ) throws HttpException, IOException {
+        final Callback<Publisher<ByteBuffer>> callback = new Callback<Publisher<ByteBuffer>>() {
+            @Override
+            public void execute(final Publisher<ByteBuffer> result) {
+                final ReactiveDataProducer producer = new ReactiveDataProducer(result);
+                if (channel != null) {
+                    producer.setChannel(channel);
+                }
+                responseProducer.set(producer);
+                result.subscribe(producer);
+            }
+        };
+        requestProcessor.processRequest(request, entityDetails, responseChannel, context, requestConsumer, callback);
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        requestConsumer.failed(cause);
+        final ReactiveDataProducer p = responseProducer.get();
+        if (p != null) {
+            p.onError(cause);
+        }
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        requestConsumer.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    public int consume(final ByteBuffer src) throws IOException {
+        return requestConsumer.consume(src);
+    }
+
+    @Override
+    public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        requestConsumer.streamEnd(trailers);
+    }
+
+    @Override
+    public int available() {
+        final ReactiveDataProducer p = responseProducer.get();
+        if (p == null) {
+            return 0;
+        } else {
+            return p.available();
+        }
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        this.channel = channel;
+        final ReactiveDataProducer p = responseProducer.get();
+        if (p != null) {
+            p.produce(channel);
+        }
+    }
+
+    @Override
+    public void releaseResources() {
+        final ReactiveDataProducer p = responseProducer.get();
+        if (p != null) {
+            p.releaseResources();
+        }
+        requestConsumer.releaseResources();
+    }
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/98caba56/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index 9b41129..8a398d1 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -27,6 +27,7 @@
 package org.apache.hc.core5.testing.reactive;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -46,25 +47,36 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
 import org.apache.hc.core5.reactor.ExceptionEvent;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
-import org.apache.hc.core5.testing.nio.EchoHandler;
 import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
 import org.apache.hc.core5.testing.nio.LoggingHttp2StreamListener;
 import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
@@ -109,6 +121,28 @@ public class ReactiveClientTest {
 
     private HttpAsyncServer server;
 
+    private static final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
+        @Override
+        public void processRequest(
+                final HttpRequest request,
+                final EntityDetails entityDetails,
+                final ResponseChannel responseChannel,
+                final HttpContext context,
+                final Publisher<ByteBuffer> requestBody,
+                final Callback<Publisher<ByteBuffer>> responseBodyFuture
+        ) throws HttpException, IOException {
+            if (new BasicHeader("Expect", "100-continue").equals(request.getSingleHeader("Expect"))) {
+                responseChannel.sendInformation(new BasicHttpResponse(100), context);
+            }
+
+            responseChannel.sendResponse(
+                    new BasicHttpResponse(200),
+                    new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+                    context);
+            responseBodyFuture.execute(requestBody);
+        }
+    }
+
     @Rule
     public ExternalResource serverResource = new ExternalResource() {
 
@@ -129,7 +163,7 @@ public class ReactiveClientTest {
 
                     @Override
                     public AsyncServerExchangeHandler get() {
-                        return new EchoHandler(10 * 1024 * 1024);
+                        return new ReactiveServerExchangeHandler(new ReactiveEchoProcessor());
                     }
 
                 })
@@ -234,7 +268,7 @@ public class ReactiveClientTest {
         final InetSocketAddress address = startClientAndServer();
         final AtomicLong requestLength = new AtomicLong(0L);
         final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
-        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 500)
+        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 10)
             .map(new Function<Long, ByteBuffer>() {
                 @Override
                 public ByteBuffer apply(final Long seed) {