You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/03/15 09:38:55 UTC
[1/2] httpcomponents-core git commit: Added examples of full-duplex
HTTP/1.1 and HTTP/2 message exchanges
Repository: httpcomponents-core
Updated Branches:
refs/heads/master c838db897 -> 54f64e291
Added examples of full-duplex HTTP/1.1 and HTTP/2 message exchanges
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/54f64e29
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/54f64e29
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/54f64e29
Branch: refs/heads/master
Commit: 54f64e29109a499a4d190024e98fd83e3008a0c1
Parents: 46ac284
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Mar 15 10:30:39 2018 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Mar 15 10:36:32 2018 +0100
----------------------------------------------------------------------
.../examples/Http2FullDuplexClientExample.java | 197 +++++++++++++++
.../examples/Http2FullDuplexServerExample.java | 243 +++++++++++++++++++
.../Http2MultiStreamExecutionExample.java | 8 +
.../examples/AsyncFullDuplexClientExample.java | 197 +++++++++++++++
.../examples/AsyncFullDuplexServerExample.java | 226 +++++++++++++++++
.../AsyncPipelinedRequestExecutionExample.java | 4 +-
.../examples/AsyncRequestExecutionExample.java | 4 +-
.../examples/ClassicGetExecutionExample.java | 4 +-
.../examples/ClassicPostExecutionExample.java | 4 +-
9 files changed, 879 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
new file mode 100644
index 0000000..b0c055c
--- /dev/null
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexClientExample.java
@@ -0,0 +1,197 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous HTTP/2 requester.
+ */
+public class Http2FullDuplexClientExample {
+
+ public static void main(String[] args) throws Exception {
+
+ IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setSoTimeout(5, TimeUnit.SECONDS)
+ .build();
+
+ // Create and start requester
+ H2Config h2Config = H2Config.custom()
+ .setPushEnabled(false)
+ .setMaxConcurrentStreams(100)
+ .build();
+ final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
+ .setH2Config(h2Config)
+ .setStreamListener(new Http2StreamListener() {
+
+ @Override
+ public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection + " (" + streamId + ") << " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection + " (" + streamId + ") >> " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ @Override
+ public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP requester shutting down");
+ requester.shutdown(ShutdownType.GRACEFUL);
+ }
+ });
+ requester.start();
+
+ final URI requestUri = new URI("http://http2bin.org/post");
+ final BasicRequestProducer requestProducer = new BasicRequestProducer(
+ "POST", requestUri, new BasicAsyncEntityProducer("stuff", ContentType.TEXT_PLAIN));
+ final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
+ new StringAsyncEntityConsumer());
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ requester.execute(new AsyncClientExchangeHandler() {
+
+ @Override
+ public void releaseResources() {
+ requestProducer.releaseResources();
+ responseConsumer.releaseResources();
+ latch.countDown();
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println(requestUri + " cancelled");
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ System.out.println(requestUri + "->" + cause);
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+ requestProducer.sendRequest(channel);
+ }
+
+ @Override
+ public int available() {
+ return requestProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ requestProducer.produce(channel);
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+ System.out.println(requestUri + "->" + response.getCode());
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+ System.out.println(requestUri + "->" + response.getCode());
+ responseConsumer.consumeResponse(response, entityDetails, null);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ responseConsumer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return responseConsumer.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ responseConsumer.streamEnd(trailers);
+ }
+
+ }, Timeout.ofSeconds(30), HttpCoreContext.create());
+
+ latch.await();
+ System.out.println("Shutting down I/O reactor");
+ requester.initiateShutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
new file mode 100644
index 0000000..7c7785a
--- /dev/null
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2FullDuplexServerExample.java
@@ -0,0 +1,243 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.http2.frame.RawFrame;
+import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
+import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous embedded HTTP/2 server.
+ */
+public class Http2FullDuplexServerExample {
+
+ public static void main(String[] args) throws Exception {
+ int port = 8080;
+ if (args.length >= 1) {
+ port = Integer.parseInt(args[0]);
+ }
+
+ IOReactorConfig config = IOReactorConfig.custom()
+ .setSoTimeout(15, TimeUnit.SECONDS)
+ .setTcpNoDelay(true)
+ .build();
+
+ H2Config h2Config = H2Config.custom()
+ .setPushEnabled(true)
+ .setMaxConcurrentStreams(100)
+ .build();
+
+ final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
+ .setIOReactorConfig(config)
+ .setH2Config(h2Config)
+ .setStreamListener(new Http2StreamListener() {
+
+ @Override
+ public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection + " (" + streamId + ") << " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
+ for (int i = 0; i < headers.size(); i++) {
+ System.out.println(connection + " (" + streamId + ") >> " + headers.get(i));
+ }
+ }
+
+ @Override
+ public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
+ }
+
+ @Override
+ public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ @Override
+ public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
+ }
+
+ })
+ .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new AsyncServerExchangeHandler() {
+
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ CapacityChannel inputCapacityChannel;
+ DataStreamChannel outputDataChannel;
+ boolean endStream;
+
+ private void ensureCapacity(final int chunk) {
+ if (buffer.remaining() < chunk) {
+ final ByteBuffer oldBuffer = buffer;
+ oldBuffer.flip();
+ buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+ buffer.put(oldBuffer);
+ }
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context) throws HttpException, IOException {
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ responseChannel.sendResponse(response, entityDetails);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.write(src);
+ }
+ }
+ if (src.hasRemaining()) {
+ ensureCapacity(src.remaining());
+ buffer.put(src);
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ return buffer.remaining();
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ if (buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ inputCapacityChannel = null;
+ } else {
+ inputCapacityChannel = capacityChannel;
+ }
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws IOException {
+ endStream = true;
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.endStream();
+ }
+ } else {
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ }
+
+ @Override
+ public int available() {
+ return buffer.position();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ outputDataChannel = channel;
+ buffer.flip();
+ if (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ buffer.compact();
+ if (buffer.position() == 0 && endStream) {
+ channel.endStream();
+ }
+ final CapacityChannel capacityChannel = inputCapacityChannel;
+ if (capacityChannel != null && buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ if (!(cause instanceof SocketException)) {
+ cause.printStackTrace(System.out);
+ }
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ };
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP server shutting down");
+ server.shutdown(ShutdownType.GRACEFUL);
+ }
+ });
+
+ server.start();
+ Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+ ListenerEndpoint listenerEndpoint = future.get();
+ System.out.print("Listening on " + listenerEndpoint.getAddress());
+ server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
index ec85ce1..8f314fb 100644
--- a/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
+++ b/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2MultiStreamExecutionExample.java
@@ -29,6 +29,7 @@ package org.apache.hc.core5.http.examples;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.Header;
@@ -46,6 +47,7 @@ import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
/**
@@ -56,11 +58,17 @@ public class Http2MultiStreamExecutionExample {
public static void main(String[] args) throws Exception {
// Create and start requester
+ IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setSoTimeout(5, TimeUnit.SECONDS)
+ .build();
+
H2Config h2Config = H2Config.custom()
.setPushEnabled(false)
.setMaxConcurrentStreams(100)
.build();
+
final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
.setH2Config(h2Config)
.setStreamListener(new Http2StreamListener() {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
new file mode 100644
index 0000000..a25e873
--- /dev/null
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexClientExample.java
@@ -0,0 +1,197 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpRequestInterceptor;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.HttpProcessors;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous HTTP/1.1 requester.
+ */
+public class AsyncFullDuplexClientExample {
+
+ public static void main(String[] args) throws Exception {
+
+ IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setSoTimeout(5, TimeUnit.SECONDS)
+ .build();
+
+ // Create and start requester
+ final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
+ .setIOReactorConfig(ioReactorConfig)
+ .setHttpProcessor(HttpProcessors.customClient(null).addLast(new HttpRequestInterceptor() {
+
+ // Disable 'Expect: Continue' handshake some servers cannot handle well
+ @Override
+ public void process(
+ final HttpRequest request, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
+ request.removeHeaders(HttpHeaders.EXPECT);
+ }
+
+ }).build())
+ .setStreamListener(new Http1StreamListener() {
+
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection + " " + new RequestLine(request));
+
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP requester shutting down");
+ requester.shutdown(ShutdownType.GRACEFUL);
+ }
+ });
+ requester.start();
+
+ final URI requestUri = new URI("http://httpbin.org/post");
+ final BasicRequestProducer requestProducer = new BasicRequestProducer(
+ "POST", requestUri, new BasicAsyncEntityProducer("stuff", ContentType.TEXT_PLAIN));
+ final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
+ new StringAsyncEntityConsumer());
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ requester.execute(new AsyncClientExchangeHandler() {
+
+ @Override
+ public void releaseResources() {
+ requestProducer.releaseResources();
+ responseConsumer.releaseResources();
+ latch.countDown();
+ }
+
+ @Override
+ public void cancel() {
+ System.out.println(requestUri + " cancelled");
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ System.out.println(requestUri + "->" + cause);
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+ requestProducer.sendRequest(channel);
+ }
+
+ @Override
+ public int available() {
+ return requestProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ requestProducer.produce(channel);
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+ System.out.println(requestUri + "->" + response.getCode());
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+ System.out.println(requestUri + "->" + response.getCode());
+ responseConsumer.consumeResponse(response, entityDetails, null);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ responseConsumer.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ return responseConsumer.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ responseConsumer.streamEnd(trailers);
+ }
+
+ }, Timeout.ofSeconds(30), HttpCoreContext.create());
+
+ latch.await(1, TimeUnit.MINUTES);
+ System.out.println("Shutting down I/O reactor");
+ requester.initiateShutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
new file mode 100644
index 0000000..17e0355
--- /dev/null
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncFullDuplexServerExample.java
@@ -0,0 +1,226 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http.examples;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.impl.Http1StreamListener;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+
+/**
+ * Example of full-duplex, streaming HTTP message exchanges with an asynchronous embedded HTTP/1.1 server.
+ */
+public class AsyncFullDuplexServerExample {
+
+ public static void main(String[] args) throws Exception {
+ int port = 8080;
+ if (args.length >= 1) {
+ port = Integer.parseInt(args[0]);
+ }
+
+ IOReactorConfig config = IOReactorConfig.custom()
+ .setSoTimeout(15, TimeUnit.SECONDS)
+ .setTcpNoDelay(true)
+ .build();
+
+ final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
+ .setIOReactorConfig(config)
+ .setStreamListener(new Http1StreamListener() {
+
+ @Override
+ public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
+ System.out.println(connection + " " + new RequestLine(request));
+ }
+
+ @Override
+ public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
+ System.out.println(connection + " " + new StatusLine(response));
+ }
+
+ @Override
+ public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
+ if (keepAlive) {
+ System.out.println(connection + " exchange completed (connection kept alive)");
+ } else {
+ System.out.println(connection + " exchange completed (connection closed)");
+ }
+ }
+
+ })
+ .register("/echo", new Supplier<AsyncServerExchangeHandler>() {
+
+ @Override
+ public AsyncServerExchangeHandler get() {
+ return new AsyncServerExchangeHandler() {
+
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ CapacityChannel inputCapacityChannel;
+ DataStreamChannel outputDataChannel;
+ boolean endStream;
+
+ private void ensureCapacity(final int chunk) {
+ if (buffer.remaining() < chunk) {
+ final ByteBuffer oldBuffer = buffer;
+ oldBuffer.flip();
+ buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+ buffer.put(oldBuffer);
+ }
+ }
+
+ @Override
+ public void handleRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context) throws HttpException, IOException {
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ responseChannel.sendResponse(response, entityDetails);
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.write(src);
+ }
+ }
+ if (src.hasRemaining()) {
+ ensureCapacity(src.remaining());
+ buffer.put(src);
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ return buffer.remaining();
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ if (buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ inputCapacityChannel = null;
+ } else {
+ inputCapacityChannel = capacityChannel;
+ }
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws IOException {
+ endStream = true;
+ if (buffer.position() == 0) {
+ if (outputDataChannel != null) {
+ outputDataChannel.endStream();
+ }
+ } else {
+ if (outputDataChannel != null) {
+ outputDataChannel.requestOutput();
+ }
+ }
+ }
+
+ @Override
+ public int available() {
+ return buffer.position();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ outputDataChannel = channel;
+ buffer.flip();
+ if (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ buffer.compact();
+ if (buffer.position() == 0 && endStream) {
+ channel.endStream();
+ }
+ final CapacityChannel capacityChannel = inputCapacityChannel;
+ if (capacityChannel != null && buffer.hasRemaining()) {
+ capacityChannel.update(buffer.remaining());
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ if (!(cause instanceof SocketException)) {
+ cause.printStackTrace(System.out);
+ }
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ };
+ }
+
+ })
+ .create();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("HTTP server shutting down");
+ server.shutdown(ShutdownType.GRACEFUL);
+ }
+ });
+
+ server.start();
+ Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(port));
+ ListenerEndpoint listenerEndpoint = future.get();
+ System.out.print("Listening on " + listenerEndpoint.getAddress());
+ server.awaitShutdown(TimeValue.ofDays(Long.MAX_VALUE));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
index e122c4d..3ed156b 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncPipelinedRequestExecutionExample.java
@@ -79,9 +79,9 @@ public class AsyncPipelinedRequestExecutionExample {
@Override
public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
if (keepAlive) {
- System.out.println(connection + " can be kept alive");
+ System.out.println(connection + " exchange completed (connection kept alive)");
} else {
- System.out.println(connection + " cannot be kept alive");
+ System.out.println(connection + " exchange completed (connection closed)");
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
index 2d65fb3..5dcf555 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
@@ -77,9 +77,9 @@ public class AsyncRequestExecutionExample {
@Override
public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
if (keepAlive) {
- System.out.println(connection + " can be kept alive");
+ System.out.println(connection + " exchange completed (connection kept alive)");
} else {
- System.out.println(connection + " cannot be kept alive");
+ System.out.println(connection + " exchange completed (connection closed)");
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
index 71a8867..18da4dc 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicGetExecutionExample.java
@@ -69,9 +69,9 @@ public class ClassicGetExecutionExample {
@Override
public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
if (keepAlive) {
- System.out.println(connection + " can be kept alive");
+ System.out.println(connection + " exchange completed (connection kept alive)");
} else {
- System.out.println(connection + " cannot be kept alive");
+ System.out.println(connection + " exchange completed (connection closed)");
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/54f64e29/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
index 0d95e84..d81f616 100644
--- a/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
+++ b/httpcore5/src/examples/org/apache/hc/core5/http/examples/ClassicPostExecutionExample.java
@@ -76,9 +76,9 @@ public class ClassicPostExecutionExample {
@Override
public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
if (keepAlive) {
- System.out.println(connection + " can be kept alive");
+ System.out.println(connection + " exchange completed (connection kept alive)");
} else {
- System.out.println(connection + " cannot be kept alive");
+ System.out.println(connection + " exchange completed (connection closed)");
}
}
[2/2] httpcomponents-core git commit: Bugfix: fixed
AbstractAsyncResponseConsumer to handle null callback parameter
Posted by ol...@apache.org.
Bugfix: fixed AbstractAsyncResponseConsumer to handle null callback parameter
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/46ac284d
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/46ac284d
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/46ac284d
Branch: refs/heads/master
Commit: 46ac284df0fc9c6eefe832c4a0db7eaba08e9e94
Parents: c838db8
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Thu Mar 15 10:29:27 2018 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Mar 15 10:36:32 2018 +0100
----------------------------------------------------------------------
.../support/AbstractAsyncResponseConsumer.java | 42 +++++++++++++++-----
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/46ac284d/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
index c77ef51..9752c73 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncResponseConsumer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
@@ -45,12 +46,14 @@ import org.apache.hc.core5.util.Args;
public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncResponseConsumer<T> {
private final AsyncEntityConsumer<E> entityConsumer;
-
- private volatile T result;
+ private final AtomicReference<T> resultRef;
+ private final AtomicReference<Exception> exceptionRef;
public AbstractAsyncResponseConsumer(final AsyncEntityConsumer<E> entityConsumer) {
Args.notNull(entityConsumer, "Entity consumer");
this.entityConsumer = entityConsumer;
+ this.resultRef = new AtomicReference<>(null);
+ this.exceptionRef = new AtomicReference<>(null);
}
protected abstract T buildResult(HttpResponse response, E entity, ContentType contentType);
@@ -68,27 +71,41 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
final ContentType contentType;
try {
contentType = ContentType.parse(entityDetails.getContentType());
- result = buildResult(response, entity, contentType);
- resultCallback.completed(result);
+ final T result = buildResult(response, entity, contentType);
+ resultRef.compareAndSet(null, result);
+ if (resultCallback != null) {
+ resultCallback.completed(result);
+ }
} catch (final UnsupportedCharsetException ex) {
- resultCallback.failed(ex);
+ exceptionRef.compareAndSet(null, ex);
+ if (resultCallback != null) {
+ resultCallback.failed(ex);
+ }
}
}
@Override
public void failed(final Exception ex) {
- resultCallback.failed(ex);
+ exceptionRef.compareAndSet(null, ex);
+ if (resultCallback != null) {
+ resultCallback.failed(ex);
+ }
}
@Override
public void cancelled() {
- resultCallback.cancelled();
+ if (resultCallback != null) {
+ resultCallback.cancelled();
+ }
}
});
} else {
- result = buildResult(response, null, null);
- resultCallback.completed(result);
+ final T result = buildResult(response, null, null);
+ resultRef.compareAndSet(null, result);
+ if (resultCallback != null) {
+ resultCallback.completed(result);
+ }
entityConsumer.releaseResources();
}
@@ -111,11 +128,16 @@ public abstract class AbstractAsyncResponseConsumer<T, E> implements AsyncRespon
@Override
public T getResult() {
- return result;
+ return resultRef.get();
+ }
+
+ public Exception getException() {
+ return exceptionRef.get();
}
@Override
public final void failed(final Exception cause) {
+ exceptionRef.compareAndSet(null, cause);
releaseResources();
}