You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/03/15 09:38:55 UTC

[1/2] httpcomponents-core git commit: Added examples of full-duplex HTTP/1.1 and HTTP/2 message exchanges

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master c838db897 -> 54f64e291


Added examples of full-duplex HTTP/1.1 and HTTP/2 message exchanges


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/54f64e29
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/54f64e29
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/54f64e29

Branch: refs/heads/master
Commit: 54f64e29109a499a4d190024e98fd83e3008a0c1
Parents: 46ac284
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Mar 15 10:30:39 2018 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Mar 15 10:36:32 2018 +0100

----------------------------------------------------------------------
 .../examples/Http2FullDuplexClientExample.java  | 197 +++++++++++++++
 .../examples/Http2FullDuplexServerExample.java  | 243 +++++++++++++++++++
 .../Http2MultiStreamExecutionExample.java       |   8 +
 .../examples/AsyncFullDuplexClientExample.java  | 197 +++++++++++++++
 .../examples/AsyncFullDuplexServerExample.java  | 226 +++++++++++++++++
 .../AsyncPipelinedRequestExecutionExample.java  |   4 +-
 .../examples/AsyncRequestExecutionExample.java  |   4 +-
 .../examples/ClassicGetExecutionExample.java    |   4 +-
 .../examples/ClassicPostExecutionExample.java   |   4 +-
 9 files changed, 879 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
new file mode 100644
index 0000000..b0c055c
--- /dev/null
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
@@ -0,0 +1,197 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous HTTP/2 requester.
+ */
+public class Http2FullDuplexClientExample {
+
+    public static void main(String[] args) throws Exception {
+
+        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+                .setSoTimeout(5, TimeUnit.SECONDS)
+                .build();
+
+        // Create and start requester
+        H2Config h2Config = H2Config.custom()
+                .setPushEnabled(false)
+                .setMaxConcurrentStreams(100)
+                .build();
+        final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+                .setIOReactorConfig(ioReactorConfig)
+                .setH2Config(h2Config)
+                .setStreamListener(new Http2StreamListener() {
+
+                    @Override
+                    public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection + " (" + streamId + ") << " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection + " (" + streamId + ") >> " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                    @Override
+                    public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                })
+                .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP requester shutting down");
+                requester.shutdown(ShutdownType.GRACEFUL);
+            }
+        });
+        requester.start();
+
+        final URI requestUri = new URI("http://http2bin.org/post");
+        final BasicRequestProducer requestProducer = new BasicRequestProducer(
+                "POST", requestUri, new BasicAsyncEntityProducer("stuff", ContentType.TEXT_PLAIN));
+        final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
+                new StringAsyncEntityConsumer());
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        requester.execute(new AsyncClientExchangeHandler() {
+
+            @Override
+            public void releaseResources() {
+                requestProducer.releaseResources();
+                responseConsumer.releaseResources();
+                latch.countDown();
+            }
+
+            @Override
+            public void cancel() {
+                System.out.println(requestUri + " cancelled");
+            }
+
+            @Override
+            public void failed(final Exception cause) {
+                System.out.println(requestUri + "->" + cause);
+            }
+
+            @Override
+            public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+                requestProducer.sendRequest(channel);
+            }
+
+            @Override
+            public int available() {
+                return requestProducer.available();
+            }
+
+            @Override
+            public void produce(final DataStreamChannel channel) throws IOException {
+                requestProducer.produce(channel);
+            }
+
+            @Override
+            public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+                System.out.println(requestUri + "->" + response.getCode());
+            }
+
+            @Override
+            public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+                System.out.println(requestUri + "->" + response.getCode());
+                responseConsumer.consumeResponse(response, entityDetails, null);
+            }
+
+            @Override
+            public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                responseConsumer.updateCapacity(capacityChannel);
+            }
+
+            @Override
+            public int consume(final ByteBuffer src) throws IOException {
+                return responseConsumer.consume(src);
+            }
+
+            @Override
+            public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+                responseConsumer.streamEnd(trailers);
+            }
+
+        }, Timeout.ofSeconds(30), HttpCoreContext.create());
+
+        latch.await();
+        System.out.println("Shutting down I/O reactor");
+        requester.initiateShutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
new file mode 100644
index 0000000..7c7785a
--- /dev/null
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
@@ -0,0 +1,243 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous embedded HTTP/2 server.
+ */
+public class Http2FullDuplexServerExample {
+
+    public static void main(String[] args) throws Exception {
+        int port = 8080;
+        if (args.length >= 1) {
+            port = Integer.parseInt(args[0]);
+        }
+
+        IOReactorConfig config = IOReactorConfig.custom()
+                .setSoTimeout(15, TimeUnit.SECONDS)
+                .setTcpNoDelay(true)
+                .build();
+
+        H2Config h2Config = H2Config.custom()
+                .setPushEnabled(true)
+                .setMaxConcurrentStreams(100)
+                .build();
+
+        final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
+                .setIOReactorConfig(config)
+                .setH2Config(h2Config)
+                .setStreamListener(new Http2StreamListener() {
+
+                    @Override
+                    public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection + " (" + streamId + ") << " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+                        for (int i = 0; i < headers.size(); i++) {
+                            System.out.println(connection + " (" + streamId + ") >> " + headers.get(i));
+                        }
+                    }
+
+                    @Override
+                    public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+                    }
+
+                    @Override
+                    public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                    @Override
+                    public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+                    }
+
+                })
+                .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+
+                    @Override
+                    public AsyncServerExchangeHandler get() {
+                        return new AsyncServerExchangeHandler() {
+
+                            ByteBuffer buffer = ByteBuffer.allocate(2048);
+                            CapacityChannel inputCapacityChannel;
+                            DataStreamChannel outputDataChannel;
+                            boolean endStream;
+
+                            private void ensureCapacity(final int chunk) {
+                                if (buffer.remaining() < chunk) {
+                                    final ByteBuffer oldBuffer = buffer;
+                                    oldBuffer.flip();
+                                    buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+                                    buffer.put(oldBuffer);
+                                }
+                            }
+
+                            @Override
+                            public void handleRequest(
+                                    final HttpRequest request,
+                                    final EntityDetails entityDetails,
+                                    final ResponseChannel responseChannel,
+                                    final HttpContext context) throws HttpException, IOException {
+                                final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+                                responseChannel.sendResponse(response, entityDetails);
+                            }
+
+                            @Override
+                            public int consume(final ByteBuffer src) throws IOException {
+                                if (buffer.position() == 0) {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.write(src);
+                                    }
+                                }
+                                if (src.hasRemaining()) {
+                                    ensureCapacity(src.remaining());
+                                    buffer.put(src);
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.requestOutput();
+                                    }
+                                }
+                                return buffer.remaining();
+                            }
+
+                            @Override
+                            public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                                if (buffer.hasRemaining()) {
+                                    capacityChannel.update(buffer.remaining());
+                                    inputCapacityChannel = null;
+                                } else {
+                                    inputCapacityChannel = capacityChannel;
+                                }
+                            }
+
+                            @Override
+                            public void streamEnd(final List<? extends Header> trailers) throws IOException {
+                                endStream = true;
+                                if (buffer.position() == 0) {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.endStream();
+                                    }
+                                } else {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.requestOutput();
+                                    }
+                                }
+                            }
+
+                            @Override
+                            public int available() {
+                                return buffer.position();
+                            }
+
+                            @Override
+                            public void produce(final DataStreamChannel channel) throws IOException {
+                                outputDataChannel = channel;
+                                buffer.flip();
+                                if (buffer.hasRemaining()) {
+                                    channel.write(buffer);
+                                }
+                                buffer.compact();
+                                if (buffer.position() == 0 && endStream) {
+                                    channel.endStream();
+                                }
+                                final CapacityChannel capacityChannel = inputCapacityChannel;
+                                if (capacityChannel != null && buffer.hasRemaining()) {
+                                    capacityChannel.update(buffer.remaining());
+                                }
+                            }
+
+                            @Override
+                            public void failed(final Exception cause) {
+                                if (!(cause instanceof SocketException)) {
+                                    cause.printStackTrace(System.out);
+                                }
+                            }
+
+                            @Override
+                            public void releaseResources() {
+                            }
+
+                        };
+                    }
+
+                })
+                .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP server shutting down");
+                server.shutdown(ShutdownType.GRACEFUL);
+            }
+        });
+
+        server.start();
+        Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+        ListenerEndpoint listenerEndpoint = future.get();
+        System.out.print("Listening on " + listenerEndpoint.getAddress());
+        server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
index ec85ce1..8f314fb 100644
--- a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http.examples;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.Header;
@@ -46,6 +47,7 @@ import org.apache.hc.core5.http2.frame.RawFrame;
 import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
 import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.util.Timeout;
 
 /**
@@ -56,11 +58,17 @@ public class Http2MultiStreamExecutionExample {
     public static void main(String[] args) throws Exception {
 
         // Create and start requester
+        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+                .setSoTimeout(5, TimeUnit.SECONDS)
+                .build();
+
         H2Config h2Config = H2Config.custom()
                 .setPushEnabled(false)
                 .setMaxConcurrentStreams(100)
                 .build();
+
         final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+                .setIOReactorConfig(ioReactorConfig)
                 .setH2Config(h2Config)
                 .setStreamListener(new Http2StreamListener() {
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
new file mode 100644
index 0000000..a25e873
--- /dev/null
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
@@ -0,0 +1,197 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.HttpProcessors;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous HTTP/1.1 requester.
+ */
+public class AsyncFullDuplexClientExample {
+
+    public static void main(String[] args) throws Exception {
+
+        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+                .setSoTimeout(5, TimeUnit.SECONDS)
+                .build();
+
+        // Create and start requester
+        final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
+                .setIOReactorConfig(ioReactorConfig)
+                .setHttpProcessor(HttpProcessors.customClient(null).addLast(new HttpRequestInterceptor() {
+
+                    // Disable 'Expect: Continue' handshake some servers cannot handle well
+                    @Override
+                    public void process(
+                            final HttpRequest request, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
+                        request.removeHeaders(HttpHeaders.EXPECT);
+                    }
+
+                }).build())
+                .setStreamListener(new Http1StreamListener() {
+
+                    @Override
+                    public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+                        System.out.println(connection + " " + new RequestLine(request));
+
+                    }
+
+                    @Override
+                    public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+                        System.out.println(connection + " " + new StatusLine(response));
+                    }
+
+                    @Override
+                    public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+                        if (keepAlive) {
+                            System.out.println(connection + " exchange completed (connection kept alive)");
+                        } else {
+                            System.out.println(connection + " exchange completed (connection closed)");
+                        }
+                    }
+
+                })
+                .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP requester shutting down");
+                requester.shutdown(ShutdownType.GRACEFUL);
+            }
+        });
+        requester.start();
+
+        final URI requestUri = new URI("http://httpbin.org/post");
+        final BasicRequestProducer requestProducer = new BasicRequestProducer(
+                "POST", requestUri, new BasicAsyncEntityProducer("stuff", ContentType.TEXT_PLAIN));
+        final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
+                new StringAsyncEntityConsumer());
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        requester.execute(new AsyncClientExchangeHandler() {
+
+            @Override
+            public void releaseResources() {
+                requestProducer.releaseResources();
+                responseConsumer.releaseResources();
+                latch.countDown();
+            }
+
+            @Override
+            public void cancel() {
+                System.out.println(requestUri + " cancelled");
+            }
+
+            @Override
+            public void failed(final Exception cause) {
+                System.out.println(requestUri + "->" + cause);
+            }
+
+            @Override
+            public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+                requestProducer.sendRequest(channel);
+            }
+
+            @Override
+            public int available() {
+                return requestProducer.available();
+            }
+
+            @Override
+            public void produce(final DataStreamChannel channel) throws IOException {
+                requestProducer.produce(channel);
+            }
+
+            @Override
+            public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+                System.out.println(requestUri + "->" + response.getCode());
+            }
+
+            @Override
+            public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+                System.out.println(requestUri + "->" + response.getCode());
+                responseConsumer.consumeResponse(response, entityDetails, null);
+            }
+
+            @Override
+            public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                responseConsumer.updateCapacity(capacityChannel);
+            }
+
+            @Override
+            public int consume(final ByteBuffer src) throws IOException {
+                return responseConsumer.consume(src);
+            }
+
+            @Override
+            public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+                responseConsumer.streamEnd(trailers);
+            }
+
+        }, Timeout.ofSeconds(30), HttpCoreContext.create());
+
+        latch.await(1, TimeUnit.MINUTES);
+        System.out.println("Shutting down I/O reactor");
+        requester.initiateShutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
new file mode 100644
index 0000000..17e0355
--- /dev/null
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
@@ -0,0 +1,226 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous embedded HTTP/1.1 server.
+ */
+public class AsyncFullDuplexServerExample {
+
+    public static void main(String[] args) throws Exception {
+        int port = 8080;
+        if (args.length >= 1) {
+            port = Integer.parseInt(args[0]);
+        }
+
+        IOReactorConfig config = IOReactorConfig.custom()
+                .setSoTimeout(15, TimeUnit.SECONDS)
+                .setTcpNoDelay(true)
+                .build();
+
+        final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
+                .setIOReactorConfig(config)
+                .setStreamListener(new Http1StreamListener() {
+
+                    @Override
+                    public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+                        System.out.println(connection + " " + new RequestLine(request));
+                    }
+
+                    @Override
+                    public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+                        System.out.println(connection + " " + new StatusLine(response));
+                    }
+
+                    @Override
+                    public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+                        if (keepAlive) {
+                            System.out.println(connection + " exchange completed (connection kept alive)");
+                        } else {
+                            System.out.println(connection + " exchange completed (connection closed)");
+                        }
+                    }
+
+                })
+                .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+
+                    @Override
+                    public AsyncServerExchangeHandler get() {
+                        return new AsyncServerExchangeHandler() {
+
+                            ByteBuffer buffer = ByteBuffer.allocate(2048);
+                            CapacityChannel inputCapacityChannel;
+                            DataStreamChannel outputDataChannel;
+                            boolean endStream;
+
+                            private void ensureCapacity(final int chunk) {
+                                if (buffer.remaining() < chunk) {
+                                    final ByteBuffer oldBuffer = buffer;
+                                    oldBuffer.flip();
+                                    buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+                                    buffer.put(oldBuffer);
+                                }
+                            }
+
+                            @Override
+                            public void handleRequest(
+                                    final HttpRequest request,
+                                    final EntityDetails entityDetails,
+                                    final ResponseChannel responseChannel,
+                                    final HttpContext context) throws HttpException, IOException {
+                                final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+                                responseChannel.sendResponse(response, entityDetails);
+                            }
+
+                            @Override
+                            public int consume(final ByteBuffer src) throws IOException {
+                                if (buffer.position() == 0) {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.write(src);
+                                    }
+                                }
+                                if (src.hasRemaining()) {
+                                    ensureCapacity(src.remaining());
+                                    buffer.put(src);
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.requestOutput();
+                                    }
+                                }
+                                return buffer.remaining();
+                            }
+
+                            @Override
+                            public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                                if (buffer.hasRemaining()) {
+                                    capacityChannel.update(buffer.remaining());
+                                    inputCapacityChannel = null;
+                                } else {
+                                    inputCapacityChannel = capacityChannel;
+                                }
+                            }
+
+                            @Override
+                            public void streamEnd(final List<? extends Header> trailers) throws IOException {
+                                endStream = true;
+                                if (buffer.position() == 0) {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.endStream();
+                                    }
+                                } else {
+                                    if (outputDataChannel != null) {
+                                        outputDataChannel.requestOutput();
+                                    }
+                                }
+                            }
+
+                            @Override
+                            public int available() {
+                                return buffer.position();
+                            }
+
+                            @Override
+                            public void produce(final DataStreamChannel channel) throws IOException {
+                                outputDataChannel = channel;
+                                buffer.flip();
+                                if (buffer.hasRemaining()) {
+                                    channel.write(buffer);
+                                }
+                                buffer.compact();
+                                if (buffer.position() == 0 && endStream) {
+                                    channel.endStream();
+                                }
+                                final CapacityChannel capacityChannel = inputCapacityChannel;
+                                if (capacityChannel != null && buffer.hasRemaining()) {
+                                    capacityChannel.update(buffer.remaining());
+                                }
+                            }
+
+                            @Override
+                            public void failed(final Exception cause) {
+                                if (!(cause instanceof SocketException)) {
+                                    cause.printStackTrace(System.out);
+                                }
+                            }
+
+                            @Override
+                            public void releaseResources() {
+                            }
+
+                        };
+                    }
+
+                })
+                .create();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("HTTP server shutting down");
+                server.shutdown(ShutdownType.GRACEFUL);
+            }
+        });
+
+        server.start();
+        Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+        ListenerEndpoint listenerEndpoint = future.get();
+        System.out.print("Listening on " + listenerEndpoint.getAddress());
+        server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
index e122c4d..3ed156b 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
@@ -79,9 +79,9 @@ public class AsyncPipelinedRequestExecutionExample {
                     @Override
                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
                         if (keepAlive) {
-                            System.out.println(connection + " can be kept alive");
+                            System.out.println(connection + " exchange completed (connection kept alive)");
                         } else {
-                            System.out.println(connection + " cannot be kept alive");
+                            System.out.println(connection + " exchange completed (connection closed)");
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
index 2d65fb3..5dcf555 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
@@ -77,9 +77,9 @@ public class AsyncRequestExecutionExample {
                     @Override
                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
                         if (keepAlive) {
-                            System.out.println(connection + " can be kept alive");
+                            System.out.println(connection + " exchange completed (connection kept alive)");
                         } else {
-                            System.out.println(connection + " cannot be kept alive");
+                            System.out.println(connection + " exchange completed (connection closed)");
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
index 71a8867..18da4dc 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
@@ -69,9 +69,9 @@ public class ClassicGetExecutionExample {
                     @Override
                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
                         if (keepAlive) {
-                            System.out.println(connection + " can be kept alive");
+                            System.out.println(connection + " exchange completed (connection kept alive)");
                         } else {
-                            System.out.println(connection + " cannot be kept alive");
+                            System.out.println(connection + " exchange completed (connection closed)");
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
index 0d95e84..d81f616 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
@@ -76,9 +76,9 @@ public class ClassicPostExecutionExample {
                     @Override
                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
                         if (keepAlive) {
-                            System.out.println(connection + " can be kept alive");
+                            System.out.println(connection + " exchange completed (connection kept alive)");
                         } else {
-                            System.out.println(connection + " cannot be kept alive");
+                            System.out.println(connection + " exchange completed (connection closed)");
                         }
                     }
 


[2/2] httpcomponents-core git commit: Bugfix: fixed AbstractAsyncResponseConsumer to handle null callback parameter

Posted by ol...@apache.org.
Bugfix: fixed AbstractAsyncResponseConsumer to handle null callback parameter


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/46ac284d
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/46ac284d
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/46ac284d

Branch: refs/heads/master
Commit: 46ac284df0fc9c6eefe832c4a0db7eaba08e9e94
Parents: c838db8
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Mar 15 10:29:27 2018 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Mar 15 10:36:32 2018 +0100

----------------------------------------------------------------------
 .../support/AbstractAsyncResponseConsumer.java  | 42 +++++++++++++++-----
 1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/46ac284d/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
index c77ef51..9752c73 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.UnsupportedCharsetException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
@@ -45,12 +46,14 @@ import org.apache.hc.core5.util.Args;
 public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
 
     private final AsyncEntityConsumer<E> entityConsumer;
-
-    private volatile T result;
+    private final AtomicReference<T> resultRef;
+    private final AtomicReference<Exception> exceptionRef;
 
     public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
         Args.notNull(entityConsumer, "Entity consumer");
         this.entityConsumer = entityConsumer;
+        this.resultRef = new AtomicReference<>(null);
+        this.exceptionRef = new AtomicReference<>(null);
     }
 
     protected abstract T buildResult(HttpResponse response, E entity, ContentType contentType);
@@ -68,27 +71,41 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
                     final ContentType contentType;
                     try {
                         contentType = ContentType.parse(entityDetails.getContentType());
-                        result = buildResult(response, entity, contentType);
-                        resultCallback.completed(result);
+                        final T result = buildResult(response, entity, contentType);
+                        resultRef.compareAndSet(null, result);
+                        if (resultCallback != null) {
+                            resultCallback.completed(result);
+                        }
                     } catch (final UnsupportedCharsetException ex) {
-                        resultCallback.failed(ex);
+                        exceptionRef.compareAndSet(null, ex);
+                        if (resultCallback != null) {
+                            resultCallback.failed(ex);
+                        }
                     }
                 }
 
                 @Override
                 public void failed(final Exception ex) {
-                    resultCallback.failed(ex);
+                    exceptionRef.compareAndSet(null, ex);
+                    if (resultCallback != null) {
+                        resultCallback.failed(ex);
+                    }
                 }
 
                 @Override
                 public void cancelled() {
-                    resultCallback.cancelled();
+                    if (resultCallback != null) {
+                        resultCallback.cancelled();
+                    }
                 }
 
             });
         } else {
-            result = buildResult(response, null, null);
-            resultCallback.completed(result);
+            final T result = buildResult(response, null, null);
+            resultRef.compareAndSet(null, result);
+            if (resultCallback != null) {
+                resultCallback.completed(result);
+            }
             entityConsumer.releaseResources();
         }
 
@@ -111,11 +128,16 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
 
     @Override
     public T getResult() {
-        return result;
+        return resultRef.get();
+    }
+
+    public Exception getException() {
+        return exceptionRef.get();
     }
 
     @Override
     public final void failed(final Exception cause) {
+        exceptionRef.compareAndSet(null, cause);
         releaseResources();
     }