You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/05/01 12:39:18 UTC
svn commit: r1793320 [4/5] - in /httpcomponents/httpclient/trunk:
httpclient5-testing/src/main/java/org/apache/hc/client5/testing/async/
httpclient5-testing/src/main/java/org/apache/hc/client5/testing/classic/
httpclient5-testing/src/test/java/org/apac...
Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java Mon May 1 12:39:16 2017
@@ -0,0 +1,126 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
+import org.apache.hc.client5.http.impl.ExecSupport;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.http.sync.HttpRequestRetryHandler;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.util.Args;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class AsyncRetryExec implements AsyncExecChainHandler {
+
+ private final Logger log = LogManager.getLogger(getClass());
+
+ private final HttpRequestRetryHandler retryHandler;
+
+ public AsyncRetryExec(final HttpRequestRetryHandler retryHandler) {
+ Args.notNull(retryHandler, "HTTP request retry handler");
+ this.retryHandler = retryHandler;
+ }
+
+ private void internalExecute(
+ final int execCount,
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecChain chain,
+ final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+
+ chain.proceed(ExecSupport.copy(request), entityProducer, scope, new AsyncExecCallback() {
+
+ @Override
+ public AsyncDataConsumer handleResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ return asyncExecCallback.handleResponse(response, entityDetails);
+ }
+
+ @Override
+ public void completed() {
+ asyncExecCallback.completed();
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ if (cause instanceof IOException) {
+ final HttpRoute route = scope.route;
+ final HttpClientContext clientContext = scope.clientContext;
+ if (retryHandler.retryRequest(request, (IOException) cause, execCount, clientContext)) {
+ if (log.isInfoEnabled()) {
+ log.info("I/O exception ("+ cause.getClass().getName() +
+ ") caught when processing request to "
+ + route +
+ ": "
+ + cause.getMessage());
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(cause.getMessage(), cause);
+ }
+ if (log.isInfoEnabled()) {
+ log.info("Retrying request to " + route);
+ }
+ try {
+ scope.execRuntime.discardConnection();
+ internalExecute(execCount + 1, request, entityProducer, scope, chain, asyncExecCallback);
+ return;
+ } catch (IOException | HttpException ex) {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+ }
+ asyncExecCallback.failed(cause);
+ }
+
+ });
+
+ }
+
+ @Override
+ public void execute(
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecChain chain,
+ final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+ internalExecute(1, request, entityProducer, scope, chain, asyncExecCallback);
+ }
+
+}
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncRetryExec.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java Mon May 1 12:39:16 2017
@@ -0,0 +1,94 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.nio.AsyncDataProducer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+public class BasicAsyncEntityProducer implements AsyncEntityProducer {
+
+ private final AsyncDataProducer dataProducer;
+ private final EntityDetails entityDetails;
+
+ BasicAsyncEntityProducer(final AsyncDataProducer dataProducer, final EntityDetails entityDetails) {
+ this.dataProducer = Args.notNull(dataProducer, "Data producer");
+ this.entityDetails = Args.notNull(entityDetails, "Entity details");
+ }
+
+ @Override
+ public void releaseResources() {
+ dataProducer.releaseResources();
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ dataProducer.releaseResources();
+ }
+
+ @Override
+ public long getContentLength() {
+ return entityDetails.getContentLength();
+ }
+
+ @Override
+ public String getContentType() {
+ return entityDetails.getContentType();
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return entityDetails.getContentEncoding();
+ }
+
+ @Override
+ public boolean isChunked() {
+ return entityDetails.isChunked();
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return entityDetails.getTrailerNames();
+ }
+
+ @Override
+ public int available() {
+ return dataProducer.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ dataProducer.produce(channel);
+ }
+
+}
\ No newline at end of file
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/BasicAsyncEntityProducer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/CloseableHttpAsyncClient.java Mon May 1 12:39:16 2017
@@ -31,16 +31,20 @@ import java.util.List;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier;
-import org.apache.hc.core5.http.HttpHost;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.ExceptionEvent;
import org.apache.hc.core5.reactor.IOReactorStatus;
@@ -66,12 +70,6 @@ public abstract class CloseableHttpAsync
public abstract void shutdown(ShutdownType shutdownType);
- public final Future<AsyncClientEndpoint> lease(
- final HttpHost host,
- final FutureCallback<AsyncClientEndpoint> callback) {
- return lease(host, HttpClientContext.create(), callback);
- }
-
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
@@ -83,4 +81,36 @@ public abstract class CloseableHttpAsync
register(null, uriPattern, supplier);
}
+ public final Future<SimpleHttpResponse> execute(
+ final SimpleHttpRequest request,
+ final HttpContext context,
+ final FutureCallback<SimpleHttpResponse> callback) {
+ final BasicFuture<SimpleHttpResponse> future = new BasicFuture<>(callback);
+ execute(new SimpleRequestProducer(request), new SimpleResponseConsumer(), context, new FutureCallback<SimpleHttpResponse>() {
+
+ @Override
+ public void completed(final SimpleHttpResponse response) {
+ future.completed(response);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ future.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ future.cancel(true);
+ }
+
+ });
+ return future;
+ }
+
+ public final Future<SimpleHttpResponse> execute(
+ final SimpleHttpRequest request,
+ final FutureCallback<SimpleHttpResponse> callback) {
+ return execute(request, HttpClientContext.create(), callback);
+ }
+
}
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java Mon May 1 12:39:16 2017
@@ -37,22 +37,31 @@ import java.util.List;
import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.SchemePortResolver;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.DefaultUserTokenHandler;
import org.apache.hc.client5.http.impl.IdleConnectionEvictor;
+import org.apache.hc.client5.http.impl.NamedElementChain;
import org.apache.hc.client5.http.impl.NoopUserTokenHandler;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.impl.protocol.DefaultRedirectStrategy;
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.impl.routing.SystemDefaultRoutePlanner;
+import org.apache.hc.client5.http.impl.sync.ChainElements;
+import org.apache.hc.client5.http.impl.sync.DefaultHttpRequestRetryHandler;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
+import org.apache.hc.client5.http.protocol.RedirectStrategy;
import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
import org.apache.hc.client5.http.protocol.RequestExpectContinue;
import org.apache.hc.client5.http.protocol.UserTokenHandler;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
+import org.apache.hc.client5.http.sync.HttpRequestRetryHandler;
+import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
+import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
@@ -66,8 +75,8 @@ import org.apache.hc.core5.http.config.H
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.http2.HttpVersionPolicy;
@@ -75,10 +84,14 @@ import org.apache.hc.core5.http2.config.
import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
import org.apache.hc.core5.http2.protocol.H2RequestContent;
import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
+import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.pool.ConnPoolControl;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.VersionInfo;
@@ -108,6 +121,54 @@ import org.apache.hc.core5.util.VersionI
*/
public class HttpAsyncClientBuilder {
+ private static class RequestInterceptorEntry {
+
+ enum Postion { FIRST, LAST }
+
+ final RequestInterceptorEntry.Postion postion;
+ final HttpRequestInterceptor interceptor;
+
+ private RequestInterceptorEntry(final RequestInterceptorEntry.Postion postion, final HttpRequestInterceptor interceptor) {
+ this.postion = postion;
+ this.interceptor = interceptor;
+ }
+ }
+
+ private static class ResponseInterceptorEntry {
+
+ enum Postion { FIRST, LAST }
+
+ final ResponseInterceptorEntry.Postion postion;
+ final HttpResponseInterceptor interceptor;
+
+ private ResponseInterceptorEntry(final ResponseInterceptorEntry.Postion postion, final HttpResponseInterceptor interceptor) {
+ this.postion = postion;
+ this.interceptor = interceptor;
+ }
+ }
+
+ private static class ExecInterceptorEntry {
+
+ enum Postion { BEFORE, AFTER, REPLACE }
+
+ final ExecInterceptorEntry.Postion postion;
+ final String name;
+ final AsyncExecChainHandler interceptor;
+ final String existing;
+
+ private ExecInterceptorEntry(
+ final ExecInterceptorEntry.Postion postion,
+ final String name,
+ final AsyncExecChainHandler interceptor,
+ final String existing) {
+ this.postion = postion;
+ this.name = name;
+ this.interceptor = interceptor;
+ this.existing = existing;
+ }
+
+ }
+
private HttpVersionPolicy versionPolicy;
private AsyncClientConnectionManager connManager;
private boolean connManagerShared;
@@ -116,16 +177,19 @@ public class HttpAsyncClientBuilder {
private H2Config h2Config;
private CharCodingConfig charCodingConfig;
private SchemePortResolver schemePortResolver;
- private ConnectionReuseStrategy reuseStrategy;
private ConnectionKeepAliveStrategy keepAliveStrategy;
private UserTokenHandler userTokenHandler;
- private LinkedList<HttpRequestInterceptor> requestFirst;
- private LinkedList<HttpRequestInterceptor> requestLast;
- private LinkedList<HttpResponseInterceptor> responseFirst;
- private LinkedList<HttpResponseInterceptor> responseLast;
+ private LinkedList<RequestInterceptorEntry> requestInterceptors;
+ private LinkedList<ResponseInterceptorEntry> responseInterceptors;
+ private LinkedList<ExecInterceptorEntry> execInterceptors;
private HttpRoutePlanner routePlanner;
+ private RedirectStrategy redirectStrategy;
+ private HttpRequestRetryHandler retryHandler;
+
+ private ConnectionReuseStrategy reuseStrategy;
+
private String userAgent;
private HttpHost proxy;
private Collection<? extends Header> defaultHeaders;
@@ -135,6 +199,8 @@ public class HttpAsyncClientBuilder {
private TimeValue maxIdleTime;
private boolean systemProperties;
+ private boolean automaticRetriesDisabled;
+ private boolean redirectHandlingDisabled;
private boolean connectionStateDisabled;
private List<Closeable> closeables;
@@ -213,6 +279,8 @@ public class HttpAsyncClientBuilder {
/**
* Assigns {@link ConnectionReuseStrategy} instance.
+ * <p>
+ * Please note this strategy applies to HTTP/1.0 and HTTP/1.1 connections only
*/
public final HttpAsyncClientBuilder setConnectionReuseStrategy(final ConnectionReuseStrategy reuseStrategy) {
this.reuseStrategy = reuseStrategy;
@@ -240,90 +308,138 @@ public class HttpAsyncClientBuilder {
}
/**
- * Disables connection state tracking.
+ * Adds this protocol interceptor to the head of the protocol processing list.
*/
- public final HttpAsyncClientBuilder disableConnectionState() {
- connectionStateDisabled = true;
+ public final HttpAsyncClientBuilder addRequestInterceptorFirst(final HttpResponseInterceptor interceptor) {
+ Args.notNull(interceptor, "Interceptor");
+ if (responseInterceptors == null) {
+ responseInterceptors = new LinkedList<>();
+ }
+ responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.FIRST, interceptor));
return this;
}
/**
- * Assigns {@link SchemePortResolver} instance.
+ * Adds this protocol interceptor to the tail of the protocol processing list.
*/
- public final HttpAsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
- this.schemePortResolver = schemePortResolver;
+ public final HttpAsyncClientBuilder addResponseInterceptorLast(final HttpResponseInterceptor interceptor) {
+ Args.notNull(interceptor, "Interceptor");
+ if (responseInterceptors == null) {
+ responseInterceptors = new LinkedList<>();
+ }
+ responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.LAST, interceptor));
return this;
}
/**
- * Assigns {@code User-Agent} value.
+ * Adds this execution interceptor before an existing interceptor.
*/
- public final HttpAsyncClientBuilder setUserAgent(final String userAgent) {
- this.userAgent = userAgent;
+ public final HttpAsyncClientBuilder addExecInterceptorBefore(final String existing, final String name, final AsyncExecChainHandler interceptor) {
+ Args.notBlank(existing, "Existing");
+ Args.notBlank(name, "Name");
+ Args.notNull(interceptor, "Interceptor");
+ if (execInterceptors == null) {
+ execInterceptors = new LinkedList<>();
+ }
+ execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.BEFORE, name, interceptor, existing));
return this;
}
/**
- * Assigns default request header values.
+ * Adds this execution interceptor after interceptor with the given name.
*/
- public final HttpAsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
- this.defaultHeaders = defaultHeaders;
+ public final HttpAsyncClientBuilder addExecInterceptorAfter(final String existing, final String name, final AsyncExecChainHandler interceptor) {
+ Args.notBlank(existing, "Existing");
+ Args.notBlank(name, "Name");
+ Args.notNull(interceptor, "Interceptor");
+ if (execInterceptors == null) {
+ execInterceptors = new LinkedList<>();
+ }
+ execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.AFTER, name, interceptor, existing));
return this;
}
/**
- * Adds this protocol interceptor to the head of the protocol processing list.
+ * Replace an existing interceptor with the given name with new interceptor.
*/
- public final HttpAsyncClientBuilder addInterceptorFirst(final HttpResponseInterceptor itcp) {
- if (itcp == null) {
- return this;
+ public final HttpAsyncClientBuilder replaceExecInterceptor(final String existing, final AsyncExecChainHandler interceptor) {
+ Args.notBlank(existing, "Existing");
+ Args.notNull(interceptor, "Interceptor");
+ if (execInterceptors == null) {
+ execInterceptors = new LinkedList<>();
}
- if (responseFirst == null) {
- responseFirst = new LinkedList<>();
+ execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.REPLACE, existing, interceptor, existing));
+ return this;
+ }
+
+ /**
+ * Adds this protocol interceptor to the head of the protocol processing list.
+ */
+ public final HttpAsyncClientBuilder addRequestInterceptorFirst(final HttpRequestInterceptor interceptor) {
+ Args.notNull(interceptor, "Interceptor");
+ if (requestInterceptors == null) {
+ requestInterceptors = new LinkedList<>();
}
- responseFirst.addFirst(itcp);
+ requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.FIRST, interceptor));
return this;
}
/**
* Adds this protocol interceptor to the tail of the protocol processing list.
*/
- public final HttpAsyncClientBuilder addInterceptorLast(final HttpResponseInterceptor itcp) {
- if (itcp == null) {
- return this;
- }
- if (responseLast == null) {
- responseLast = new LinkedList<>();
+ public final HttpAsyncClientBuilder addResponseInterceptorLast(final HttpRequestInterceptor interceptor) {
+ Args.notNull(interceptor, "Interceptor");
+ if (requestInterceptors == null) {
+ requestInterceptors = new LinkedList<>();
}
- responseLast.addLast(itcp);
+ requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.LAST, interceptor));
return this;
}
/**
- * Adds this protocol interceptor to the head of the protocol processing list.
+ * Assigns {@link HttpRequestRetryHandler} instance.
+ * <p>
+ * Please note this value can be overridden by the {@link #disableAutomaticRetries()}
+ * method.
*/
- public final HttpAsyncClientBuilder addInterceptorFirst(final HttpRequestInterceptor itcp) {
- if (itcp == null) {
- return this;
- }
- if (requestFirst == null) {
- requestFirst = new LinkedList<>();
- }
- requestFirst.addFirst(itcp);
+ public final HttpAsyncClientBuilder setRetryHandler(final HttpRequestRetryHandler retryHandler) {
+ this.retryHandler = retryHandler;
return this;
}
/**
- * Adds this protocol interceptor to the tail of the protocol processing list.
+ * Assigns {@link RedirectStrategy} instance.
+ * <p>
+ * Please note this value can be overridden by the {@link #disableRedirectHandling()}
+ * method.
+ * </p>
*/
- public final HttpAsyncClientBuilder addInterceptorLast(final HttpRequestInterceptor itcp) {
- if (itcp == null) {
- return this;
- }
- if (requestLast == null) {
- requestLast = new LinkedList<>();
- }
- requestLast.addLast(itcp);
+ public HttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy) {
+ this.redirectStrategy = redirectStrategy;
+ return this;
+ }
+
+ /**
+ * Assigns {@link SchemePortResolver} instance.
+ */
+ public final HttpAsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
+ this.schemePortResolver = schemePortResolver;
+ return this;
+ }
+
+ /**
+ * Assigns {@code User-Agent} value.
+ */
+ public final HttpAsyncClientBuilder setUserAgent(final String userAgent) {
+ this.userAgent = userAgent;
+ return this;
+ }
+
+ /**
+ * Assigns default request header values.
+ */
+ public final HttpAsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
+ this.defaultHeaders = defaultHeaders;
return this;
}
@@ -366,6 +482,30 @@ public class HttpAsyncClientBuilder {
}
/**
+ * Disables connection state tracking.
+ */
+ public final HttpAsyncClientBuilder disableConnectionState() {
+ connectionStateDisabled = true;
+ return this;
+ }
+
+ /**
+ * Disables automatic redirect handling.
+ */
+ public final HttpAsyncClientBuilder disableRedirectHandling() {
+ redirectHandlingDisabled = true;
+ return this;
+ }
+
+ /**
+ * Disables automatic request recovery and re-execution.
+ */
+ public final HttpAsyncClientBuilder disableAutomaticRetries() {
+ automaticRetriesDisabled = true;
+ return this;
+ }
+
+ /**
* Makes this instance of HttpClient proactively evict expired connections from the
* connection pool using a background thread.
* <p>
@@ -407,8 +547,20 @@ public class HttpAsyncClientBuilder {
}
/**
+ * Request exec chain customization and extension.
+ * <p>
* For internal use.
*/
+ @Internal
+ protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
+ }
+
+ /**
+ * Adds to the list of {@link Closeable} resources to be managed by the client.
+ * <p>
+ * For internal use.
+ */
+ @Internal
protected void addCloseable(final Closeable closeable) {
if (closeable == null) {
return;
@@ -439,6 +591,11 @@ public class HttpAsyncClientBuilder {
}
}
+ final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
+ execChainDefinition.addLast(
+ new AsyncMainClientExec(keepAliveStrategyCopy, userTokenHandlerCopy),
+ ChainElements.MAIN_TRANSPORT.name());
+
String userAgentCopy = this.userAgent;
if (userAgentCopy == null) {
if (systemProperties) {
@@ -451,14 +608,18 @@ public class HttpAsyncClientBuilder {
}
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
- if (requestFirst != null) {
- for (final HttpRequestInterceptor i: requestFirst) {
- b.addFirst(i);
+ if (requestInterceptors != null) {
+ for (final RequestInterceptorEntry entry: requestInterceptors) {
+ if (entry.postion == RequestInterceptorEntry.Postion.FIRST) {
+ b.addFirst(entry.interceptor);
+ }
}
}
- if (responseFirst != null) {
- for (final HttpResponseInterceptor i: responseFirst) {
- b.addFirst(i);
+ if (responseInterceptors != null) {
+ for (final ResponseInterceptorEntry entry: responseInterceptors) {
+ if (entry.postion == ResponseInterceptorEntry.Postion.FIRST) {
+ b.addFirst(entry.interceptor);
+ }
}
}
b.addAll(
@@ -468,17 +629,35 @@ public class HttpAsyncClientBuilder {
new H2RequestConnControl(),
new RequestUserAgent(userAgentCopy),
new RequestExpectContinue());
- if (requestLast != null) {
- for (final HttpRequestInterceptor i: requestLast) {
- b.addLast(i);
+ if (requestInterceptors != null) {
+ for (final RequestInterceptorEntry entry: requestInterceptors) {
+ if (entry.postion == RequestInterceptorEntry.Postion.LAST) {
+ b.addFirst(entry.interceptor);
+ }
}
}
- if (responseLast != null) {
- for (final HttpResponseInterceptor i: responseLast) {
- b.addLast(i);
+ if (responseInterceptors != null) {
+ for (final ResponseInterceptorEntry entry: responseInterceptors) {
+ if (entry.postion == ResponseInterceptorEntry.Postion.LAST) {
+ b.addFirst(entry.interceptor);
+ }
}
}
- final HttpProcessor httpProcessor = b.build();
+
+ execChainDefinition.addFirst(
+ new AsyncProtocolExec(b.build()),
+ ChainElements.PROTOCOL.name());
+
+ // Add request retry executor, if not disabled
+ if (!automaticRetriesDisabled) {
+ HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
+ if (retryHandlerCopy == null) {
+ retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
+ }
+ execChainDefinition.addFirst(
+ new AsyncRetryExec(retryHandlerCopy),
+ ChainElements.RETRY_IO_ERROR.name());
+ }
HttpRoutePlanner routePlannerCopy = this.routePlanner;
if (routePlannerCopy == null) {
@@ -495,6 +674,18 @@ public class HttpAsyncClientBuilder {
routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
}
}
+
+ // Add redirect executor, if not disabled
+ if (!redirectHandlingDisabled) {
+ RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
+ if (redirectStrategyCopy == null) {
+ redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
+ }
+ execChainDefinition.addFirst(
+ new AsyncRedirectExec(routePlannerCopy, redirectStrategyCopy),
+ ChainElements.REDIRECT.name());
+ }
+
List<Closeable> closeablesCopy = closeables != null ? new ArrayList<>(closeables) : null;
if (!this.connManagerShared) {
if (closeablesCopy == null) {
@@ -538,7 +729,7 @@ public class HttpAsyncClientBuilder {
}
final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
final IOEventHandlerFactory ioEventHandlerFactory = new HttpAsyncClientEventHandlerFactory(
- httpProcessor,
+ NoopHttpProcessor.INSTANCE,
new HandlerFactory<AsyncPushConsumer>() {
@Override
@@ -552,22 +743,56 @@ public class HttpAsyncClientBuilder {
h1Config != null ? h1Config : H1Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT,
reuseStrategyCopy);
+ final DefaultConnectingIOReactor ioReactor;
try {
- return new InternalHttpAsyncClient(
+ ioReactor = new DefaultConnectingIOReactor(
ioEventHandlerFactory,
- pushConsumerRegistry,
ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
- new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
- connManagerCopy,
- routePlannerCopy,
- keepAliveStrategyCopy,
- userTokenHandlerCopy,
- defaultRequestConfig,
- closeablesCopy);
+ new Callback<IOSession>() {
+
+ @Override
+ public void execute(final IOSession ioSession) {
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+ }
+
+ });
} catch (final IOReactorException ex) {
throw new IllegalStateException(ex.getMessage(), ex);
}
+
+ if (execInterceptors != null) {
+ for (final ExecInterceptorEntry entry: execInterceptors) {
+ switch (entry.postion) {
+ case AFTER:
+ execChainDefinition.addAfter(entry.existing, entry.interceptor, entry.name);
+ break;
+ case BEFORE:
+ execChainDefinition.addBefore(entry.existing, entry.interceptor, entry.name);
+ break;
+ }
+ }
+ }
+
+ customizeExecChain(execChainDefinition);
+
+ NamedElementChain<AsyncExecChainHandler>.Node current = execChainDefinition.getLast();
+ AsyncExecChainElement execChain = null;
+ while (current != null) {
+ execChain = new AsyncExecChainElement(current.getValue(), execChain);
+ current = current.getPrevious();
+ }
+
+ return new InternalHttpAsyncClient(
+ ioReactor,
+ execChain,
+ pushConsumerRegistry,
+ new DefaultThreadFactory("httpclient-main", true),
+ connManagerCopy,
+ routePlannerCopy,
+ versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE,
+ defaultRequestConfig,
+ closeablesCopy);
}
}
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientEventHandlerFactory.java Mon May 1 12:39:16 2017
@@ -33,7 +33,6 @@ import java.util.List;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.impl.logging.LogAppendable;
-import org.apache.hc.client5.http.impl.logging.LoggingIOEventHandler;
import org.apache.hc.client5.http.impl.logging.LoggingIOSession;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
@@ -78,7 +77,7 @@ import org.apache.logging.log4j.Logger;
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class HttpAsyncClientEventHandlerFactory implements IOEventHandlerFactory {
- private final Logger streamLog = LogManager.getLogger(ClientHttpProtocolNegotiator.class);
+ private final Logger streamLog = LogManager.getLogger(InternalHttpAsyncClient.class);
private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");
private final Logger headerLog = LogManager.getLogger("org.apache.hc.client5.http.headers");
private final Logger frameLog = LogManager.getLogger("org.apache.hc.client5.http2.frame");
@@ -282,15 +281,12 @@ public class HttpAsyncClientEventHandler
});
final LoggingIOSession loggingIOSession = new LoggingIOSession(ioSession, id, sessionLog, wireLog);
- return new LoggingIOEventHandler(
- new ClientHttpProtocolNegotiator(
+ return new ClientHttpProtocolNegotiator(
loggingIOSession,
http1StreamHandlerFactory,
http2StreamHandlerFactory,
attachment instanceof HttpVersionPolicy ? (HttpVersionPolicy) attachment : versionPolicy,
- connectionListener),
- id,
- streamLog);
+ connectionListener);
} else {
final ClientHttp1StreamDuplexerFactory http1StreamHandlerFactory = new ClientHttp1StreamDuplexerFactory(
httpProcessor,
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java Mon May 1 12:39:16 2017
@@ -66,6 +66,13 @@ public class HttpAsyncClients {
}
/**
+ * Creates {@link CloseableHttpAsyncClient} instance with default configuration.
+ */
+ public static CloseableHttpAsyncClient createDefault() {
+ return HttpAsyncClientBuilder.create().build();
+ }
+
+ /**
* Creates {@link CloseableHttpAsyncClient} instance with default
* configuration and system properties.
*/
@@ -85,12 +92,15 @@ public class HttpAsyncClients {
private static MinimalHttpAsyncClient createMinimalImpl(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
+ final HttpVersionPolicy versionPolicy,
+ final IOReactorConfig ioReactorConfig,
final AsyncClientConnectionManager connmgr) {
try {
return new MinimalHttpAsyncClient(
eventHandlerFactory,
pushConsumerRegistry,
- IOReactorConfig.DEFAULT,
+ versionPolicy,
+ ioReactorConfig,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
connmgr);
@@ -99,10 +109,15 @@ public class HttpAsyncClients {
}
}
+ /**
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
+ * essential HTTP/1.1 and HTTP/2 message transport only.
+ */
public static MinimalHttpAsyncClient createMinimal(
final HttpVersionPolicy versionPolicy,
final H2Config h2Config,
final H1Config h1Config,
+ final IOReactorConfig ioReactorConfig,
final AsyncClientConnectionManager connmgr) {
return createMinimalImpl(
new HttpAsyncClientEventHandlerFactory(
@@ -114,30 +129,80 @@ public class HttpAsyncClients {
CharCodingConfig.DEFAULT,
DefaultConnectionReuseStrategy.INSTANCE),
new AsyncPushConsumerRegistry(),
+ versionPolicy,
+ ioReactorConfig,
connmgr);
}
/**
- * Creates {@link CloseableHttpAsyncClient} instance that provides
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
+ * essential HTTP/1.1 and HTTP/2 message transport only.
+ */
+ public static MinimalHttpAsyncClient createMinimal(
+ final HttpVersionPolicy versionPolicy,
+ final H2Config h2Config,
+ final H1Config h1Config,
+ final IOReactorConfig ioReactorConfig) {
+ return createMinimal(versionPolicy, h2Config, h1Config, ioReactorConfig,
+ PoolingAsyncClientConnectionManagerBuilder.create().build());
+ }
+
+ /**
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
+ * essential HTTP/1.1 and HTTP/2 message transport only.
+ */
+ public static MinimalHttpAsyncClient createMinimal(
+ final HttpVersionPolicy versionPolicy,
+ final H2Config h2Config,
+ final H1Config h1Config) {
+ return createMinimal(versionPolicy, h2Config, h1Config, IOReactorConfig.DEFAULT);
+ }
+
+ /**
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
* essential HTTP/1.1 and HTTP/2 message transport only.
*/
- public static CloseableHttpAsyncClient createMinimal() {
+ public static MinimalHttpAsyncClient createMinimal() {
return createMinimal(
HttpVersionPolicy.NEGOTIATE,
H2Config.DEFAULT,
+ H1Config.DEFAULT);
+ }
+
+ /**
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
+ * essential HTTP/1.1 transport only.
+ */
+ public static MinimalHttpAsyncClient createMinimal(final H1Config h1Config, final IOReactorConfig ioReactorConfig) {
+ return createMinimal(
+ HttpVersionPolicy.FORCE_HTTP_1,
+ H2Config.DEFAULT,
+ h1Config,
+ ioReactorConfig);
+ }
+
+ /**
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
+ * essential HTTP/2 transport only.
+ */
+ public static MinimalHttpAsyncClient createMinimal(final H2Config h2Config, final IOReactorConfig ioReactorConfig) {
+ return createMinimal(
+ HttpVersionPolicy.FORCE_HTTP_2,
+ h2Config,
H1Config.DEFAULT,
- PoolingAsyncClientConnectionManagerBuilder.create().build());
+ ioReactorConfig);
}
/**
- * Creates {@link CloseableHttpAsyncClient} instance that provides
+ * Creates {@link MinimalHttpAsyncClient} instance that provides
* essential HTTP/1.1 and HTTP/2 message transport only.
*/
- public static CloseableHttpAsyncClient createMinimal(final AsyncClientConnectionManager connManager) {
+ public static MinimalHttpAsyncClient createMinimal(final AsyncClientConnectionManager connManager) {
return createMinimal(
HttpVersionPolicy.NEGOTIATE,
H2Config.DEFAULT,
H1Config.DEFAULT,
+ IOReactorConfig.DEFAULT,
connManager);
}
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java Mon May 1 12:39:16 2017
@@ -28,77 +28,61 @@ package org.apache.hc.client5.http.impl.
import java.io.Closeable;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
-import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
-import org.apache.hc.client5.http.protocol.UserTokenHandler;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.message.RequestLine;
-import org.apache.hc.core5.http.message.StatusLine;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
-import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.reactor.IOEventHandlerFactory;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.reactor.IOReactorException;
-import org.apache.hc.core5.util.Args;
-import org.apache.hc.core5.util.Asserts;
-import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
- private final static AtomicLong COUNT = new AtomicLong(0);
-
private final AsyncClientConnectionManager connmgr;
+ private final AsyncExecChainElement execChain;
private final HttpRoutePlanner routePlanner;
- private final ConnectionKeepAliveStrategy keepAliveStrategy;
- private final UserTokenHandler userTokenHandler;
+ private final HttpVersionPolicy versionPolicy;
private final RequestConfig defaultConfig;
private final List<Closeable> closeables;
InternalHttpAsyncClient(
- final IOEventHandlerFactory eventHandlerFactory,
+ final DefaultConnectingIOReactor ioReactor,
+ final AsyncExecChainElement execChain,
final AsyncPushConsumerRegistry pushConsumerRegistry,
- final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
- final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr,
final HttpRoutePlanner routePlanner,
- final ConnectionKeepAliveStrategy keepAliveStrategy,
- final UserTokenHandler userTokenHandler,
+ final HttpVersionPolicy versionPolicy,
final RequestConfig defaultConfig,
- final List<Closeable> closeables) throws IOReactorException {
- super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
+ final List<Closeable> closeables) {
+ super(ioReactor, pushConsumerRegistry, threadFactory);
this.connmgr = connmgr;
+ this.execChain = execChain;
this.routePlanner = routePlanner;
- this.keepAliveStrategy = keepAliveStrategy;
- this.userTokenHandler = userTokenHandler;
+ this.versionPolicy = versionPolicy;
this.defaultConfig = defaultConfig;
this.closeables = closeables;
}
@@ -117,97 +101,72 @@ class InternalHttpAsyncClient extends Ab
}
}
- private void leaseEndpoint(
+ private void setupContext(final HttpClientContext context) {
+ if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
+ context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
+ }
+ }
+
+ private void executeChain(
+ final String exchangeId,
+ final AsyncExecChainElement execChain,
final HttpRoute route,
- final Object userToken,
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final AsyncClientExchangeHandler exchangeHandler,
final HttpClientContext clientContext,
- final FutureCallback<AsyncConnectionEndpoint> callback) {
- final RequestConfig requestConfig = clientContext.getRequestConfig();
- connmgr.lease(route, userToken, requestConfig.getConnectTimeout(),
- new FutureCallback<AsyncConnectionEndpoint>() {
+ final AsyncExecRuntime execRuntime) throws IOException, HttpException {
- @Override
- public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
- if (connectionEndpoint.isConnected()) {
- callback.completed(connectionEndpoint);
- } else {
- connmgr.connect(
- connectionEndpoint,
- getConnectionInitiator(),
- requestConfig.getConnectTimeout(),
- clientContext,
- new FutureCallback<AsyncConnectionEndpoint>() {
-
- @Override
- public void completed(final AsyncConnectionEndpoint result) {
- callback.completed(result);
- }
-
- @Override
- public void failed(final Exception ex) {
- callback.failed(ex);
- }
-
- @Override
- public void cancelled() {
- callback.cancelled();
- }
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": preparing request execution");
+ }
- });
- }
+ setupContext(clientContext);
+
+ final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
+ execChain.execute(
+ ExecSupport.copy(request),
+ entityDetails != null ? new BasicAsyncEntityProducer(exchangeHandler, entityDetails) : null,
+ scope,
+ new AsyncExecCallback() {
+
+ @Override
+ public AsyncDataConsumer handleResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ exchangeHandler.consumeResponse(response, entityDetails);
+ return exchangeHandler;
}
@Override
- public void failed(final Exception ex) {
- callback.failed(ex);
+ public void completed() {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": message exchange successfully completed");
+ }
+ try {
+ exchangeHandler.releaseResources();
+ } finally {
+ execRuntime.releaseConnection();
+ }
}
@Override
- public void cancelled() {
- callback.cancelled();
+ public void failed(final Exception cause) {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": request failed: " + cause.getMessage());
+ }
+ try {
+ exchangeHandler.failed(cause);
+ exchangeHandler.releaseResources();
+ } finally {
+ execRuntime.discardConnection();
+ }
}
});
}
@Override
- public Future<AsyncClientEndpoint> lease(
- final HttpHost host,
- final HttpContext context,
- final FutureCallback<AsyncClientEndpoint> callback) {
- Args.notNull(host, "Host");
- Args.notNull(context, "HTTP context");
- ensureRunning();
- final HttpClientContext clientContext = HttpClientContext.adapt(context);
- final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
- try {
- final HttpRoute route = routePlanner.determineRoute(host, clientContext);
- final Object userToken = clientContext.getUserToken();
- leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
-
- @Override
- public void completed(final AsyncConnectionEndpoint result) {
- future.completed(new InternalAsyncClientEndpoint(route, result));
- }
-
- @Override
- public void failed(final Exception ex) {
- future.failed(ex);
- }
-
- @Override
- public void cancelled() {
- future.cancel(true);
- }
-
- });
- } catch (final HttpException ex) {
- future.failed(ex);
- }
- return future;
- }
-
- @Override
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
@@ -217,7 +176,7 @@ class InternalHttpAsyncClient extends Ab
final BasicFuture<T> future = new BasicFuture<>(callback);
try {
final HttpClientContext clientContext = HttpClientContext.adapt(context);
- final HttpRequest request = requestProducer.produceRequest();
+
RequestConfig requestConfig = null;
if (requestProducer instanceof Configurable) {
requestConfig = ((Configurable) requestProducer).getConfig();
@@ -225,36 +184,12 @@ class InternalHttpAsyncClient extends Ab
if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig);
}
- final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
- final HttpRoute route = routePlanner.determineRoute(target, clientContext);
- final Object userToken = clientContext.getUserToken();
- leaseEndpoint(route, userToken, clientContext, new FutureCallback<AsyncConnectionEndpoint>() {
-
- @Override
- public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
- final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(route, connectionEndpoint);
- endpoint.execute(requestProducer, responseConsumer, clientContext, new FutureCallback<T>() {
-
- @Override
- public void completed(final T result) {
- endpoint.releaseAndReuse();
- future.completed(result);
- }
- @Override
- public void failed(final Exception ex) {
- endpoint.releaseAndDiscard();
- future.failed(ex);
- }
-
- @Override
- public void cancelled() {
- endpoint.releaseAndDiscard();
- future.cancel();
- }
-
- });
+ final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
+ @Override
+ public void completed(final T result) {
+ future.completed(result);
}
@Override
@@ -268,185 +203,26 @@ class InternalHttpAsyncClient extends Ab
}
});
- } catch (final HttpException ex) {
- future.failed(ex);
- }
- return future;
- }
-
- private void setupContext(final HttpClientContext context) {
- if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
- context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
- }
- }
-
- private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
-
- private final HttpRoute route;
- private final AsyncConnectionEndpoint connectionEndpoint;
- private final AtomicBoolean reusable;
- private final AtomicReference<Object> userTokenRef;
- private final AtomicReference<TimeValue> keepAliveRef;
- private final AtomicBoolean released;
-
- InternalAsyncClientEndpoint(final HttpRoute route, final AsyncConnectionEndpoint connectionEndpoint) {
- this.route = route;
- this.connectionEndpoint = connectionEndpoint;
- this.reusable = new AtomicBoolean(true);
- this.keepAliveRef = new AtomicReference<>(TimeValue.NEG_ONE_MILLISECONDS);
- this.userTokenRef = new AtomicReference<>(null);
- this.released = new AtomicBoolean(false);
- }
-
- @Override
- public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
- Asserts.check(!released.get(), ConnPoolSupport.getId(connectionEndpoint) + " endpoint has already been released");
-
- final HttpClientContext clientContext = HttpClientContext.adapt(context);
- setupContext(clientContext);
-
- connectionEndpoint.execute(new AsyncClientExchangeHandler() {
-
- private final String id = Long.toString(COUNT.incrementAndGet());
-
- void updateState() {
- reusable.set(true);
- Object userToken = clientContext.getUserToken();
- if (userToken == null) {
- userToken = userTokenHandler.getUserToken(route, context);
- context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
- }
- userTokenRef.set(userToken);
- }
-
- @Override
- public void produceRequest(
- final RequestChannel channel) throws HttpException, IOException {
- exchangeHandler.produceRequest(log.isDebugEnabled() ? new RequestChannel() {
-
- @Override
- public void sendRequest(
- final HttpRequest request,
- final EntityDetails entityDetails) throws HttpException, IOException {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": request " + new RequestLine(request));
- }
- channel.sendRequest(request, entityDetails);
- }
-
- } : channel);
- }
-
- @Override
- public int available() {
- return exchangeHandler.available();
- }
-
- @Override
- public void produce(final DataStreamChannel channel) throws IOException {
- exchangeHandler.produce(channel);
- }
+ exchangeHandler.produceRequest(new RequestChannel() {
@Override
- public void consumeResponse(
- final HttpResponse response,
+ public void sendRequest(
+ final HttpRequest request,
final EntityDetails entityDetails) throws HttpException, IOException {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": response " + new StatusLine(response));
- }
- exchangeHandler.consumeResponse(response, entityDetails);
-
- keepAliveRef.set(keepAliveStrategy.getKeepAliveDuration(response, context));
-
- if (entityDetails == null) {
- updateState();
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": completed");
- }
- }
- }
-
- @Override
- public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": intermediate response " + new StatusLine(response));
- }
- exchangeHandler.consumeInformation(response);
- }
- @Override
- public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
- exchangeHandler.updateCapacity(capacityChannel);
+ final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
+ final HttpRoute route = routePlanner.determineRoute(target, clientContext);
+ final String exchangeId = "ex-" + Long.toHexString(ExecSupport.getNextExecNumber());
+ final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
+ executeChain(exchangeId, execChain, route, request, entityDetails, exchangeHandler, clientContext, execRuntime);
}
- @Override
- public int consume(final ByteBuffer src) throws IOException {
- return exchangeHandler.consume(src);
- }
-
- @Override
- public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": completed");
- }
- exchangeHandler.streamEnd(trailers);
- updateState();
- }
-
- @Override
- public void failed(final Exception cause) {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": failed", cause);
- }
- reusable.set(false);
- exchangeHandler.failed(cause);
- }
-
- @Override
- public void cancel() {
- if (log.isDebugEnabled()) {
- log.debug(ConnPoolSupport.getId(connectionEndpoint) + " exchange " + id + ": cancelled");
- }
- reusable.set(false);
- exchangeHandler.cancel();
- }
-
- @Override
- public void releaseResources() {
- exchangeHandler.releaseResources();
- }
-
- }, clientContext);
- }
-
- private void closeEndpoint() {
- try {
- connectionEndpoint.close();
- } catch (final IOException ex) {
- log.debug("I/O error closing connection endpoint: " + ex.getMessage(), ex);
- }
- }
-
- @Override
- public void releaseAndReuse() {
- if (released.compareAndSet(false, true)) {
- if (!reusable.get()) {
- closeEndpoint();
- connmgr.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECONDS);
- } else {
- connmgr.release(connectionEndpoint, userTokenRef.get(), keepAliveRef.get());
- }
- }
- }
+ });
- @Override
- public void releaseAndDiscard() {
- if (released.compareAndSet(false, true)) {
- closeEndpoint();
- connmgr.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECONDS);
- }
+ } catch (HttpException | IOException ex) {
+ future.failed(ex);
}
-
+ return future;
}
}
Added: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java?rev=1793320&view=auto
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java (added)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java Mon May 1 12:39:16 2017
@@ -0,0 +1,198 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.message.RequestLine;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.util.Identifiable;
+import org.apache.logging.log4j.Logger;
+
+final class LoggingAsyncClientExchangeHandler implements AsyncClientExchangeHandler, Identifiable {
+
+ private final Logger log;
+ private final String exchangeId;
+ private final AsyncClientExchangeHandler handler;
+
+ LoggingAsyncClientExchangeHandler(final Logger log, final String exchangeId, final AsyncClientExchangeHandler handler) {
+ this.log = log;
+ this.exchangeId = exchangeId;
+ this.handler = handler;
+ }
+
+ @Override
+ public String getId() {
+ return exchangeId;
+ }
+
+ @Override
+ public void releaseResources() {
+ handler.releaseResources();
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+ handler.produceRequest(new RequestChannel() {
+
+ @Override
+ public void sendRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": send request " + new RequestLine(request) + ", " +
+ (entityDetails != null ? "entity len " + entityDetails.getContentLength() : "null entity"));
+ }
+ channel.sendRequest(request, entityDetails);
+ }
+
+ });
+ }
+
+ @Override
+ public int available() {
+ return handler.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": produce request data");
+ }
+ handler.produce(new DataStreamChannel() {
+
+ @Override
+ public void requestOutput() {
+ channel.requestOutput();
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": produce request data, len " + src.remaining() + " bytes");
+ }
+ return channel.write(src);
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": end of request data");
+ }
+ channel.endStream();
+ }
+
+ @Override
+ public void endStream(final List<? extends Header> trailers) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": end of request data");
+ }
+ channel.endStream(trailers);
+ }
+
+ });
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": information response " + new StatusLine(response));
+ }
+ handler.consumeInformation(response);
+ }
+
+ @Override
+ public void consumeResponse(
+ final HttpResponse response,
+ final EntityDetails entityDetails) throws HttpException, IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": consume response " + new StatusLine(response) + ", " +
+ (entityDetails != null ? "entity len " + entityDetails.getContentLength() : " null entity"));
+ }
+ handler.consumeResponse(response, entityDetails);
+ }
+
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ handler.updateCapacity(new CapacityChannel() {
+
+ @Override
+ public void update(final int increment) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": capacity update " + increment);
+ }
+ capacityChannel.update(increment);
+ }
+
+ });
+ }
+
+ @Override
+ public int consume(final ByteBuffer src) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": consume response data, len " + src.remaining() + " bytes");
+ }
+ return handler.consume(src);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": end of response data");
+ }
+ handler.streamEnd(trailers);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": execution failed: " + cause.getMessage());
+ }
+ handler.failed(cause);
+ }
+
+ @Override
+ public void cancel() {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": execution cancelled");
+ }
+ handler.cancel();
+ }
+
+}
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/LoggingAsyncClientExchangeHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java?rev=1793320&r1=1793319&r2=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java Mon May 1 12:39:16 2017
@@ -34,6 +34,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.ConnPoolSupport;
+import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -41,32 +43,54 @@ import org.apache.hc.core5.concurrent.Ba
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOReactorException;
+import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.TimeValue;
-class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
+public class MinimalHttpAsyncClient extends AbstractHttpAsyncClientBase {
private final AsyncClientConnectionManager connmgr;
+ private final HttpVersionPolicy versionPolicy;
- public MinimalHttpAsyncClient(
+ MinimalHttpAsyncClient(
final IOEventHandlerFactory eventHandlerFactory,
final AsyncPushConsumerRegistry pushConsumerRegistry,
+ final HttpVersionPolicy versionPolicy,
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr) throws IOReactorException {
- super(eventHandlerFactory, pushConsumerRegistry, reactorConfig, threadFactory, workerThreadFactory);
+ super(new DefaultConnectingIOReactor(
+ eventHandlerFactory,
+ reactorConfig,
+ workerThreadFactory,
+ new Callback<IOSession>() {
+
+ @Override
+ public void execute(final IOSession ioSession) {
+ ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
+ }
+
+ }),
+ pushConsumerRegistry,
+ threadFactory);
+ this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
this.connmgr = connmgr;
}
@@ -77,8 +101,7 @@ class MinimalHttpAsyncClient extends Abs
final FutureCallback<AsyncConnectionEndpoint> callback) {
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
final Future<AsyncConnectionEndpoint> leaseFuture = connmgr.lease(
- new HttpRoute(host), null,
- connectTimeout,
+ new HttpRoute(host), null, connectTimeout,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override
@@ -90,6 +113,7 @@ class MinimalHttpAsyncClient extends Abs
connectionEndpoint,
getConnectionInitiator(),
connectTimeout,
+ versionPolicy,
clientContext,
new FutureCallback<AsyncConnectionEndpoint>() {
@@ -128,7 +152,12 @@ class MinimalHttpAsyncClient extends Abs
return resultFuture;
}
- @Override
+ public final Future<AsyncClientEndpoint> lease(
+ final HttpHost host,
+ final FutureCallback<AsyncClientEndpoint> callback) {
+ return lease(host, HttpClientContext.create(), callback);
+ }
+
public Future<AsyncClientEndpoint> lease(
final HttpHost host,
final HttpContext context,
@@ -253,7 +282,14 @@ class MinimalHttpAsyncClient extends Abs
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
Asserts.check(!released.get(), "Endpoint has already been released");
- connectionEndpoint.execute(exchangeHandler, context);
+
+ final String exchangeId = Long.toHexString(ExecSupport.getNextExecNumber());
+ if (log.isDebugEnabled()) {
+ log.debug(ConnPoolSupport.getId(connectionEndpoint) + ": executing message exchange " + exchangeId);
+ }
+ connectionEndpoint.execute(
+ log.isDebugEnabled() ? new LoggingAsyncClientExchangeHandler(log, exchangeId, exchangeHandler) : exchangeHandler,
+ context);
}
@Override
Copied: httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java (from r1793319, httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java?p2=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java&p1=httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java&r1=1793319&r2=1793320&rev=1793320&view=diff
==============================================================================
--- httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/SimpleResponseConsumer.java (original)
+++ httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/NoopHttpProcessor.java Mon May 1 12:39:16 2017
@@ -24,21 +24,30 @@
* <http://www.apache.org/>.
*
*/
-package org.apache.hc.client5.http.async.methods;
-import org.apache.hc.core5.http.ContentType;
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
-public final class SimpleResponseConsumer extends AbstractAsyncResponseConsumer<SimpleHttpResponse, String> {
+final class NoopHttpProcessor implements HttpProcessor {
- public SimpleResponseConsumer() {
- super(new StringAsyncEntityConsumer());
+ static final NoopHttpProcessor INSTANCE = new NoopHttpProcessor();
+
+ @Override
+ public void process(
+ final HttpRequest request, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
}
@Override
- protected SimpleHttpResponse buildResult(final HttpResponse response, final String entity, final ContentType contentType) {
- return new SimpleHttpResponse(response, entity, contentType);
+ public void process(
+ final HttpResponse response, final EntityDetails entity, final HttpContext context) throws HttpException, IOException {
}
-}
\ No newline at end of file
+}