You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/28 09:03:52 UTC

httpcomponents-core git commit: HTTPCLIENT-1942: Add example of full-duplex reactive message exchange [Forced Update!]

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master be6f5fa16 -> 86ee1ec21 (forced update)


HTTPCLIENT-1942: Add example of full-duplex reactive message exchange


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/86ee1ec2
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/86ee1ec2
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/86ee1ec2

Branch: refs/heads/master
Commit: 86ee1ec211abf8ea814743449fef88756e68ac11
Parents: 4404061
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Thu Sep 27 16:28:23 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Sep 28 11:03:13 2018 +0200

----------------------------------------------------------------------
 .../ReactiveFullDuplexClientExample.java        | 159 +++++++++++++++++++
 .../ReactiveFullDuplexServerExample.java        | 147 +++++++++++++++++
 2 files changed, 306 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/86ee1ec2/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
new file mode 100644
index 0000000..d71e1a4
--- /dev/null
+++ b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
@@ -0,0 +1,159 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive.examples;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Flowable;
+import io.reactivex.Notification;
+import io.reactivex.Observable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+
+/**
+ * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo will stream randomly
+ * generated text to the server via a POST request, while writing the response stream's events to standard output.
+ * This demo works out-of-the-box with {@link ReactiveFullDuplexServerExample}.
+ */
+public class ReactiveFullDuplexClientExample {
+
+    public static void main(String[] args) throws Exception {
+        String endpoint = "http://localhost:8080/echo";
+        if (args.length >= 1) {
+            endpoint = args[0];
+        }
+
+        // Create and start requester
+        final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
+            .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
+            .setStreamListener(new Http1StreamListener() {
+                @Override
+                public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+                    System.out.println(connection + " " + new RequestLine(request));
+
+                }
+
+                @Override
+                public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+                    System.out.println(connection + " " + new StatusLine(response));
+                }
+
+                @Override
+                public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+                    if (keepAlive) {
+                        System.out.println(connection + " exchange completed (connection kept alive)");
+                    } else {
+                        System.out.println(connection + " exchange completed (connection closed)");
+                    }
+                }
+
+            })
+            .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP requester shutting down");
+                requester.close(CloseMode.GRACEFUL);
+            }
+        });
+        requester.start();
+
+        final Random random = new Random();
+        final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
+            .map(new Function<Integer, ByteBuffer>() {
+                @Override
+                public ByteBuffer apply(final Integer ignored) {
+                    final String str = random.nextDouble() + "\n";
+                    return ByteBuffer.wrap(str.getBytes(UTF_8));
+                }
+            });
+        final AsyncEntityProducer reactiveEntityProducer = new ReactiveEntityProducer(publisher, -1,
+            ContentType.TEXT_PLAIN.toString(), null);
+        final URI requestUri = new URI(endpoint);
+        final BasicRequestProducer requestProducer = new BasicRequestProducer(
+            "POST", requestUri, reactiveEntityProducer);
+
+        final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+        final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
+        final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
+
+        System.out.println(streamingResponse.getHead());
+        for (final Header header : streamingResponse.getHead().getAllHeaders()) {
+            System.out.println(header.toString());
+        }
+        System.out.println();
+
+        Observable.fromPublisher(streamingResponse.getBody())
+            .map(new Function<ByteBuffer, String>() {
+                @Override
+                public String apply(final ByteBuffer byteBuffer) {
+                    final byte[] string = new byte[byteBuffer.remaining()];
+                    byteBuffer.get(string);
+                    return new String(string);
+                }
+            })
+            .materialize()
+            .forEach(new Consumer<Notification<String>>() {
+                @Override
+                public void accept(final Notification<String> byteBufferNotification) {
+                    System.out.println(byteBufferNotification.toString());
+                }
+            });
+
+        responseComplete.get(1, TimeUnit.MINUTES);
+        System.out.println("Shutting down I/O reactor");
+        requester.initiateShutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/86ee1ec2/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
new file mode 100644
index 0000000..49372e7
--- /dev/null
+++ b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
@@ -0,0 +1,147 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+import org.reactivestreams.Publisher;
+
+/**
+ * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo server works out-of-the-box
+ * with {@link ReactiveFullDuplexClientExample}; it can also be invoked interactively using telnet.
+ */
+public class ReactiveFullDuplexServerExample {
+    public static void main(String[] args) throws Exception {
+        int port = 8080;
+        if (args.length >= 1) {
+            port = Integer.parseInt(args[0]);
+        }
+
+        IOReactorConfig config = IOReactorConfig.custom()
+            .setSoTimeout(15, TimeUnit.SECONDS)
+            .setTcpNoDelay(true)
+            .build();
+
+        final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
+            .setIOReactorConfig(config)
+            .setStreamListener(new Http1StreamListener() {
+                @Override
+                public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+                    System.out.println(connection + " " + new RequestLine(request));
+
+                }
+
+                @Override
+                public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+                    System.out.println(connection + " " + new StatusLine(response));
+                }
+
+                @Override
+                public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+                    if (keepAlive) {
+                        System.out.println(connection + " exchange completed (connection kept alive)");
+                    } else {
+                        System.out.println(connection + " exchange completed (connection closed)");
+                    }
+                }
+
+            })
+            .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+                @Override
+                public AsyncServerExchangeHandler get() {
+                    return new ReactiveServerExchangeHandler(new ReactiveRequestProcessor() {
+                        @Override
+                        public void processRequest(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ResponseChannel responseChannel,
+                            final HttpContext context,
+                            final Publisher<ByteBuffer> requestBody,
+                            final Callback<Publisher<ByteBuffer>> responseBodyFuture
+                        ) throws HttpException, IOException {
+                            if (new BasicHeader("Expect", "100-continue").equals(request.getSingleHeader("Expect"))) {
+                                responseChannel.sendInformation(new BasicHttpResponse(100), context);
+                            }
+
+                            responseChannel.sendResponse(
+                                new BasicHttpResponse(200),
+                                new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+                                context);
+
+                            // Simply using the request publisher as the response publisher will
+                            // cause the server to echo the request body.
+                            responseBodyFuture.execute(requestBody);
+                        }
+                    });
+                }
+            })
+            .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP server shutting down");
+                server.close(CloseMode.GRACEFUL);
+            }
+        });
+
+        server.start();
+        Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+        ListenerEndpoint listenerEndpoint = future.get();
+        System.out.print("Listening on " + listenerEndpoint.getAddress());
+        server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+    }
+}