You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/28 09:03:52 UTC
httpcomponents-core git commit: HTTPCLIENT-1942: Add example of
full-duplex reactive message exchange [Forced Update!]
Repository: httpcomponents-core
Updated Branches:
refs/heads/master be6f5fa16 -> 86ee1ec21 (forced update)
HTTPCLIENT-1942: Add example of full-duplex reactive message exchange
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/86ee1ec2
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/86ee1ec2
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/86ee1ec2
Branch: refs/heads/master
Commit: 86ee1ec211abf8ea814743449fef88756e68ac11
Parents: 4404061
Author: Ryan Schmitt <ry...@amazon.com>
Authored: Thu Sep 27 16:28:23 2018 -0700
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Sep 28 11:03:13 2018 +0200
----------------------------------------------------------------------
.../ReactiveFullDuplexClientExample.java | 159 +++++++++++++++++++
.../ReactiveFullDuplexServerExample.java | 147 +++++++++++++++++
2 files changed, 306 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/86ee1ec2/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
new file mode 100644
index 0000000..d71e1a4
--- /dev/null
+++ b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexClientExample.java
@@ -0,0 +1,159 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive.examples;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveEntityProducer;
+import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Flowable;
+import io.reactivex.Notification;
+import io.reactivex.Observable;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
+
+/**
+ * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo will stream randomly
+ * generated text to the server via a POST request, while writing the response stream's events to standard output.
+ * This demo works out-of-the-box with {@link ReactiveFullDuplexServerExample}.
+ */
+public class ReactiveFullDuplexClientExample {
+
+ public static void main(String[] args) throws Exception {
+ String endpoint = "http://localhost:8080/echo";
+ if (args.length >= 1) {
+ endpoint = args[0];
+ }
+
+ // Create and start requester
+ final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
+ .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build())
+ .setStreamListener(new Http1StreamListener() {
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection + " " + new RequestLine(request));
+
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP requester shutting down");
+ requester.close(CloseMode.GRACEFUL);
+ }
+ });
+ requester.start();
+
+ final Random random = new Random();
+ final Flowable<ByteBuffer> publisher = Flowable.range(1, 100)
+ .map(new Function<Integer, ByteBuffer>() {
+ @Override
+ public ByteBuffer apply(final Integer ignored) {
+ final String str = random.nextDouble() + "\n";
+ return ByteBuffer.wrap(str.getBytes(UTF_8));
+ }
+ });
+ final AsyncEntityProducer reactiveEntityProducer = new ReactiveEntityProducer(publisher, -1,
+ ContentType.TEXT_PLAIN.toString(), null);
+ final URI requestUri = new URI(endpoint);
+ final BasicRequestProducer requestProducer = new BasicRequestProducer(
+ "POST", requestUri, reactiveEntityProducer);
+
+ final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
+ final Future<Void> responseComplete = requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
+ final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse = consumer.getResponseFuture().get();
+
+ System.out.println(streamingResponse.getHead());
+ for (final Header header : streamingResponse.getHead().getAllHeaders()) {
+ System.out.println(header.toString());
+ }
+ System.out.println();
+
+ Observable.fromPublisher(streamingResponse.getBody())
+ .map(new Function<ByteBuffer, String>() {
+ @Override
+ public String apply(final ByteBuffer byteBuffer) {
+ final byte[] string = new byte[byteBuffer.remaining()];
+ byteBuffer.get(string);
+ return new String(string);
+ }
+ })
+ .materialize()
+ .forEach(new Consumer<Notification<String>>() {
+ @Override
+ public void accept(final Notification<String> byteBufferNotification) {
+ System.out.println(byteBufferNotification.toString());
+ }
+ });
+
+ responseComplete.get(1, TimeUnit.MINUTES);
+ System.out.println("Shutting down I/O reactor");
+ requester.initiateShutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/86ee1ec2/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
new file mode 100644
index 0000000..49372e7
--- /dev/null
+++ b/httpcore5-reactive/src/examples/org/apache/hc/core5/reactive/examples/ReactiveFullDuplexServerExample.java
@@ -0,0 +1,147 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.reactive.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+import org.reactivestreams.Publisher;
+
+/**
+ * Example of full-duplex HTTP/1.1 message exchanges using reactive streaming. This demo server works out-of-the-box
+ * with {@link ReactiveFullDuplexClientExample}; it can also be invoked interactively using telnet.
+ */
+public class ReactiveFullDuplexServerExample {
+ public static void main(String[] args) throws Exception {
+ int port = 8080;
+ if (args.length >= 1) {
+ port = Integer.parseInt(args[0]);
+ }
+
+ IOReactorConfig config = IOReactorConfig.custom()
+ .setSoTimeout(15, TimeUnit.SECONDS)
+ .setTcpNoDelay(true)
+ .build();
+
+ final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
+ .setIOReactorConfig(config)
+ .setStreamListener(new Http1StreamListener() {
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection + " " + new RequestLine(request));
+
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new ReactiveServerExchangeHandler(new ReactiveRequestProcessor() {
+ @Override
+ public void processRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context,
+ final Publisher<ByteBuffer> requestBody,
+ final Callback<Publisher<ByteBuffer>> responseBodyFuture
+ ) throws HttpException, IOException {
+ if (new BasicHeader("Expect", "100-continue").equals(request.getSingleHeader("Expect"))) {
+ responseChannel.sendInformation(new BasicHttpResponse(100), context);
+ }
+
+ responseChannel.sendResponse(
+ new BasicHttpResponse(200),
+ new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+ context);
+
+ // Simply using the request publisher as the response publisher will
+ // cause the server to echo the request body.
+ responseBodyFuture.execute(requestBody);
+ }
+ });
+ }
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP server shutting down");
+ server.close(CloseMode.GRACEFUL);
+ }
+ });
+
+ server.start();
+ Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+ ListenerEndpoint listenerEndpoint = future.get();
+ System.out.print("Listening on " + listenerEndpoint.getAddress());
+ server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+ }
+}