You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2014/10/10 21:49:16 UTC
svn commit: r1630966 -
/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/
Author: olegk
Date: Fri Oct 10 19:49:15 2014
New Revision: 1630966
URL: http://svn.apache.org/r1630966
Log:
Refactored internal exchange handling code
Added:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java (with props)
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java (with props)
Removed:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalConnManager.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExec.java
Modified:
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalClientExec.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalState.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MainClientExec.java
httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
Added: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java?rev=1630966&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java (added)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java Fri Oct 10 19:49:15 2014
@@ -0,0 +1,436 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.impl.nio.client;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.http.ConnectionClosedException;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.routing.RouteTracker;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.util.Asserts;
+
+/**
+ * Abstract {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler} class
+ * that implements connection management aspects shared by all HTTP exchange handlers.
+ * <p>
+ * Instances of this class are expected to be accessed by one thread at a time only.
+ * The {@link #cancel()} method can be called concurrently by multiple threads.
+ */
+abstract class AbstractClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
+
+ private static final AtomicLong COUNTER = new AtomicLong(1);
+
+ protected final Log log;
+
+ private final long id;
+ private final HttpClientContext localContext;
+ private final BasicFuture<T> resultFuture;
+ private final NHttpClientConnectionManager connmgr;
+ private final ConnectionReuseStrategy connReuseStrategy;
+ private final ConnectionKeepAliveStrategy keepaliveStrategy;
+ private final AtomicReference<NHttpClientConnection> managedConnRef;
+ private final AtomicReference<HttpRoute> routeRef;
+ private final AtomicReference<RouteTracker> routeTrackerRef;
+ private final AtomicBoolean routeEstablished;
+ private final AtomicReference<Long> validDurationRef;
+ private final AtomicReference<HttpRequestWrapper> requestRef;
+ private final AtomicReference<HttpResponse> responseRef;
+ private final AtomicBoolean completed;
+ private final AtomicBoolean closed;
+
+ AbstractClientExchangeHandler(
+ final Log log,
+ final HttpClientContext localContext,
+ final BasicFuture<T> resultFuture,
+ final NHttpClientConnectionManager connmgr,
+ final ConnectionReuseStrategy connReuseStrategy,
+ final ConnectionKeepAliveStrategy keepaliveStrategy) {
+ super();
+ this.log = log;
+ this.id = COUNTER.getAndIncrement();
+ this.localContext = localContext;
+ this.resultFuture = resultFuture;
+ this.connmgr = connmgr;
+ this.connReuseStrategy = connReuseStrategy;
+ this.keepaliveStrategy = keepaliveStrategy;
+ this.managedConnRef = new AtomicReference<NHttpClientConnection>(null);
+ this.routeRef = new AtomicReference<HttpRoute>(null);
+ this.routeTrackerRef = new AtomicReference<RouteTracker>(null);
+ this.routeEstablished = new AtomicBoolean(false);
+ this.validDurationRef = new AtomicReference<Long>(null);
+ this.requestRef = new AtomicReference<HttpRequestWrapper>(null);
+ this.responseRef = new AtomicReference<HttpResponse>(null);
+ this.completed = new AtomicBoolean(false);
+ this.closed = new AtomicBoolean(false);
+ }
+
+ final long getId() {
+ return this.id;
+ }
+
+ final boolean isCompleted() {
+ return this.completed.get();
+ }
+
+ final void markCompleted() {
+ this.completed.set(true);
+ }
+
+ final void markConnectionNonReusable() {
+ this.validDurationRef.set(null);
+ }
+
+ final boolean isRouteEstablished() {
+ return this.routeEstablished.get();
+ }
+
+ final HttpRoute getRoute() {
+ return this.routeRef.get();
+ }
+
+ final void setRoute(final HttpRoute route) {
+ this.routeRef.set(route);
+ }
+
+ final HttpRequestWrapper getCurrentRequest() {
+ return this.requestRef.get();
+ }
+
+ final void setCurrentRequest(final HttpRequestWrapper request) {
+ this.requestRef.set(request);
+ }
+
+ final HttpResponse getCurrentResponse() {
+ return this.responseRef.get();
+ }
+
+ final void setCurrentResponse(final HttpResponse response) {
+ this.responseRef.set(response);
+ }
+
+ final HttpRoute getActualRoute() {
+ final RouteTracker routeTracker = this.routeTrackerRef.get();
+ return routeTracker != null ? routeTracker.toRoute() : null;
+ }
+
+ final void verifytRoute() {
+ if (!this.routeEstablished.get() && this.routeTrackerRef.get() == null) {
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final boolean routeComplete = this.connmgr.isRouteComplete(managedConn);
+ this.routeEstablished.set(routeComplete);
+ if (!routeComplete) {
+ this.log.debug("Start connection routing");
+ final HttpRoute route = this.routeRef.get();
+ this.routeTrackerRef.set(new RouteTracker(route));
+ } else {
+ this.log.debug("Connection route already established");
+ }
+ }
+ }
+
+ final void onRouteToTarget() throws IOException {
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final HttpRoute route = this.routeRef.get();
+ Asserts.check(route != null, "Inconsistent state: HTTP route is null");
+ final RouteTracker routeTracker = this.routeTrackerRef.get();
+ Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
+ this.connmgr.startRoute(managedConn, route, this.localContext);
+ routeTracker.connectTarget(route.isSecure());
+ }
+
+ final void onRouteToProxy() throws IOException {
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final HttpRoute route = this.routeRef.get();
+ Asserts.check(route != null, "Inconsistent state: HTTP route is null");
+ final RouteTracker routeTracker = this.routeTrackerRef.get();
+ Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
+ this.connmgr.startRoute(managedConn, route, this.localContext);
+ final HttpHost proxy = route.getProxyHost();
+ routeTracker.connectProxy(proxy, false);
+ }
+
+ final void onRouteUpgrade() throws IOException {
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final HttpRoute route = this.routeRef.get();
+ Asserts.check(route != null, "Inconsistent state: HTTP route is null");
+ final RouteTracker routeTracker = this.routeTrackerRef.get();
+ Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
+ this.connmgr.upgrade(managedConn, route, this.localContext);
+ routeTracker.layerProtocol(route.isSecure());
+ }
+
+ final void onRouteTunnelToTarget() {
+ final RouteTracker routeTracker = this.routeTrackerRef.get();
+ Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
+ routeTracker.tunnelTarget(false);
+ }
+
+ final void onRouteComplete() {
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final HttpRoute route = this.routeRef.get();
+ Asserts.check(route != null, "Inconsistent state: HTTP route is null");
+ this.connmgr.routeComplete(managedConn, route, this.localContext);
+ this.routeEstablished.set(true);
+ this.routeTrackerRef.set(null);
+ }
+
+ final NHttpClientConnection getConnection() {
+ return this.managedConnRef.get();
+ }
+
+ final void releaseConnection() {
+ final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
+ if (localConn != null) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] releasing connection");
+ }
+ localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
+ final Long validDuration = this.validDurationRef.get();
+ if (validDuration != null) {
+ final Object userToken = this.localContext.getUserToken();
+ this.connmgr.releaseConnection(localConn, userToken, validDuration, TimeUnit.MILLISECONDS);
+ } else {
+ try {
+ localConn.close();
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] connection discarded");
+ }
+ } catch (final IOException ex) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug(ex.getMessage(), ex);
+ }
+ } finally {
+ this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ final void discardConnection() {
+ final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
+ if (localConn != null) {
+ try {
+ localConn.shutdown();
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] connection aborted");
+ }
+ } catch (final IOException ex) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug(ex.getMessage(), ex);
+ }
+ } finally {
+ this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ final boolean manageConnectionPersistence() {
+ final HttpResponse response = this.responseRef.get();
+ Asserts.check(response != null, "Inconsistent state: HTTP response");
+ final NHttpClientConnection managedConn = this.managedConnRef.get();
+ Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
+ final boolean keepAlive = managedConn.isOpen() &&
+ this.connReuseStrategy.keepAlive(response, this.localContext);
+ if (keepAlive) {
+ final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
+ response, this.localContext);
+ if (this.log.isDebugEnabled()) {
+ final String s;
+ if (validDuration > 0) {
+ s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
+ } else {
+ s = "indefinitely";
+ }
+ this.log.debug("[exchange: " + this.id + "] Connection can be kept alive " + s);
+ }
+ this.validDurationRef.set(validDuration);
+ } else {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] Connection cannot be kept alive");
+ }
+ this.validDurationRef.set(null);
+ }
+ return keepAlive;
+ }
+
+ private void connectionAllocated(final NHttpClientConnection managedConn) {
+ try {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
+ }
+ this.managedConnRef.set(managedConn);
+
+ if (this.closed.get()) {
+ discardConnection();
+ return;
+ }
+
+ managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
+ managedConn.requestOutput();
+ if (!managedConn.isOpen()) {
+ failed(new ConnectionClosedException("Connection closed"));
+ }
+ } catch (final RuntimeException runex) {
+ failed(runex);
+ throw runex;
+ }
+ }
+
+ private void connectionRequestFailed(final Exception ex) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] connection request failed");
+ }
+ failed(ex);
+ }
+
+ private void connectionRequestCancelled() {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
+ }
+ try {
+ this.resultFuture.cancel();
+ } finally {
+ close();
+ }
+ }
+
+ final void requestConnection() {
+ final HttpRoute route = this.routeRef.get();
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
+ }
+
+ discardConnection();
+
+ this.validDurationRef.set(null);
+ this.routeTrackerRef.set(null);
+ this.routeEstablished.set(false);
+
+ final Object userToken = this.localContext.getUserToken();
+ final RequestConfig config = this.localContext.getRequestConfig();
+ this.connmgr.requestConnection(
+ route,
+ userToken,
+ config.getConnectTimeout(),
+ config.getConnectionRequestTimeout(),
+ TimeUnit.MILLISECONDS,
+ new FutureCallback<NHttpClientConnection>() {
+
+ @Override
+ public void completed(final NHttpClientConnection managedConn) {
+ connectionAllocated(managedConn);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ connectionRequestFailed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ connectionRequestCancelled();
+ }
+
+ });
+ }
+
+ abstract void releaseResources();
+
+ abstract void executionFailed(final Exception ex);
+
+ abstract boolean executionCancelled();
+
+ @Override
+ public final void close() {
+ if (this.closed.compareAndSet(false, true)) {
+ discardConnection();
+ releaseResources();
+ }
+ }
+
+ @Override
+ public final boolean isDone() {
+ return this.completed.get();
+ }
+
+ @Override
+ public final void failed(final Exception ex) {
+ try {
+ executionFailed(ex);
+ } finally {
+ try {
+ this.resultFuture.failed(ex);
+ } finally {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public final boolean cancel() {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + this.id + "] Cancelled");
+ }
+ if (this.closed.compareAndSet(false, true)) {
+ try {
+ try {
+ return executionCancelled();
+ } finally {
+ this.resultFuture.cancel();
+ }
+ } finally {
+ discardConnection();
+ releaseResources();
+ }
+ }
+ return false;
+ }
+
+}
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/AbstractClientExchangeHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/CloseableHttpAsyncClientBase.java Fri Oct 10 19:49:15 2014
@@ -76,9 +76,6 @@ abstract class CloseableHttpAsyncClientB
this.status = new AtomicReference<Status>(Status.INACTIVE);
}
- void startConnManager(final NHttpClientEventHandler handler) {
- }
-
@Override
public void start() {
if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/DefaultClientExchangeHandlerImpl.java Fri Oct 10 19:49:15 2014
@@ -27,54 +27,38 @@
package org.apache.http.impl.nio.client;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
-import org.apache.http.ConnectionClosedException;
+import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpExecutionAware;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.BasicFuture;
-import org.apache.http.concurrent.Cancellable;
-import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
-import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
-import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
/**
- * Default implementation of {@link HttpAsyncClientExchangeHandler}.
+ * Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}.
* <p>
* Instances of this class are expected to be accessed by one thread at a time only.
* The {@link #cancel()} method can be called concurrently by multiple threads.
*/
-class DefaultClientExchangeHandlerImpl<T>
- implements HttpAsyncClientExchangeHandler, InternalConnManager, Cancellable {
-
- private final Log log;
+class DefaultClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
private final HttpAsyncRequestProducer requestProducer;
private final HttpAsyncResponseConsumer<T> responseConsumer;
- private final HttpClientContext localContext;
private final BasicFuture<T> resultFuture;
- private final NHttpClientConnectionManager connmgr;
private final InternalClientExec exec;
private final InternalState state;
- private final AtomicReference<NHttpClientConnection> managedConn;
- private final AtomicBoolean closed;
- private final AtomicBoolean completed;
public DefaultClientExchangeHandlerImpl(
final Log log,
@@ -83,27 +67,19 @@ class DefaultClientExchangeHandlerImpl<T
final HttpClientContext localContext,
final BasicFuture<T> resultFuture,
final NHttpClientConnectionManager connmgr,
+ final ConnectionReuseStrategy connReuseStrategy,
+ final ConnectionKeepAliveStrategy keepaliveStrategy,
final InternalClientExec exec) {
- super();
- this.log = log;
+ super(log, localContext, resultFuture, connmgr, connReuseStrategy, keepaliveStrategy);
this.requestProducer = requestProducer;
this.responseConsumer = responseConsumer;
- this.localContext = localContext;
this.resultFuture = resultFuture;
- this.connmgr = connmgr;
this.exec = exec;
- this.state = new InternalState(requestProducer, responseConsumer, localContext);
- this.closed = new AtomicBoolean(false);
- this.completed = new AtomicBoolean(false);
- this.managedConn = new AtomicReference<NHttpClientConnection>(null);
+ this.state = new InternalState(getId(), requestProducer, responseConsumer, localContext);
}
@Override
- public void close() {
- if (this.closed.getAndSet(true)) {
- return;
- }
- abortConnection();
+ void releaseResources() {
try {
this.requestProducer.close();
} catch (final IOException ex) {
@@ -116,6 +92,28 @@ class DefaultClientExchangeHandlerImpl<T
}
}
+ @Override
+ void executionFailed(final Exception ex) {
+ this.requestProducer.failed(ex);
+ this.responseConsumer.failed(ex);
+ }
+
+ @Override
+ boolean executionCancelled() {
+ final boolean cancelled = this.responseConsumer.cancel();
+
+ final T result = this.responseConsumer.getResult();
+ final Exception ex = this.responseConsumer.getException();
+ if (ex != null) {
+ this.resultFuture.failed(ex);
+ } else if (result != null) {
+ this.resultFuture.completed(result);
+ } else {
+ this.resultFuture.cancel();
+ }
+ return cancelled;
+ }
+
public void start() throws HttpException, IOException {
final HttpHost target = this.requestProducer.getTarget();
final HttpRequest original = this.requestProducer.generateRequest();
@@ -123,16 +121,11 @@ class DefaultClientExchangeHandlerImpl<T
if (original instanceof HttpExecutionAware) {
((HttpExecutionAware) original).setCancellable(this);
}
- this.exec.prepare(this.state, target, original);
+ this.exec.prepare(target, original, this.state, this);
requestConnection();
}
@Override
- public boolean isDone() {
- return this.completed.get();
- }
-
- @Override
public HttpRequest generateRequest() throws IOException, HttpException {
return this.exec.generateRequest(this.state, this);
}
@@ -145,13 +138,13 @@ class DefaultClientExchangeHandlerImpl<T
@Override
public void requestCompleted() {
- this.exec.requestCompleted(this.state);
+ this.exec.requestCompleted(this.state, this);
}
@Override
public void responseReceived(
final HttpResponse response) throws IOException, HttpException {
- this.exec.responseReceived(this.state, response);
+ this.exec.responseReceived(response, this.state, this);
}
@Override
@@ -159,9 +152,9 @@ class DefaultClientExchangeHandlerImpl<T
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
this.exec.consumeContent(this.state, decoder, ioctrl);
if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
- this.state.setNonReusable();
+ markConnectionNonReusable();
try {
- this.completed.set(true);
+ markCompleted();
releaseConnection();
this.resultFuture.cancel();
} finally {
@@ -176,7 +169,7 @@ class DefaultClientExchangeHandlerImpl<T
if (this.state.getFinalResponse() != null || this.resultFuture.isDone()) {
try {
- this.completed.set(true);
+ markCompleted();
releaseConnection();
final T result = this.responseConsumer.getResult();
final Exception ex = this.responseConsumer.getException();
@@ -189,7 +182,7 @@ class DefaultClientExchangeHandlerImpl<T
close();
}
} else {
- NHttpClientConnection localConn = this.managedConn.get();
+ NHttpClientConnection localConn = getConnection();
if (localConn != null && !localConn.isOpen()) {
releaseConnection();
localConn = null;
@@ -204,191 +197,15 @@ class DefaultClientExchangeHandlerImpl<T
@Override
public void inputTerminated() {
- if (!this.completed.get()) {
+ if (!isCompleted()) {
requestConnection();
} else {
close();
}
}
- @Override
- public void releaseConnection() {
- final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
- if (localConn != null) {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] releasing connection");
- }
- localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
- if (this.state.isReusable()) {
- this.connmgr.releaseConnection(localConn,
- this.localContext.getUserToken(),
- this.state.getValidDuration(), TimeUnit.MILLISECONDS);
- } else {
- try {
- localConn.close();
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] connection discarded");
- }
- } catch (final IOException ex) {
- if (this.log.isDebugEnabled()) {
- this.log.debug(ex.getMessage(), ex);
- }
- } finally {
- this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- @Override
public void abortConnection() {
discardConnection();
}
- private void discardConnection() {
- final NHttpClientConnection localConn = this.managedConn.getAndSet(null);
- if (localConn != null) {
- try {
- localConn.shutdown();
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] connection aborted");
- }
- } catch (final IOException ex) {
- if (this.log.isDebugEnabled()) {
- this.log.debug(ex.getMessage(), ex);
- }
- } finally {
- this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- @Override
- public void failed(final Exception ex) {
- try {
- this.requestProducer.failed(ex);
- this.responseConsumer.failed(ex);
- } finally {
- try {
- this.resultFuture.failed(ex);
- } finally {
- close();
- }
- }
- }
-
- @Override
- public boolean cancel() {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] Cancelled");
- }
- try {
- final boolean cancelled = this.responseConsumer.cancel();
-
- final T result = this.responseConsumer.getResult();
- final Exception ex = this.responseConsumer.getException();
- if (ex != null) {
- this.resultFuture.failed(ex);
- } else if (result != null) {
- this.resultFuture.completed(result);
- } else {
- this.resultFuture.cancel();
- }
- return cancelled;
- } catch (final RuntimeException runex) {
- this.resultFuture.failed(runex);
- throw runex;
- } finally {
- close();
- }
- }
-
- private void connectionAllocated(final NHttpClientConnection managedConn) {
- try {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] Connection allocated: " + managedConn);
- }
- this.managedConn.set(managedConn);
-
- if (this.closed.get()) {
- releaseConnection();
- return;
- }
-
- managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
- managedConn.requestOutput();
- if (!managedConn.isOpen()) {
- failed(new ConnectionClosedException("Connection closed"));
- }
- } catch (final RuntimeException runex) {
- failed(runex);
- throw runex;
- }
- }
-
- private void connectionRequestFailed(final Exception ex) {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] connection request failed");
- }
- failed(ex);
- }
-
- private void connectionRequestCancelled() {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] Connection request cancelled");
- }
- try {
- this.resultFuture.cancel();
- } finally {
- close();
- }
- }
-
- private void requestConnection() {
- if (this.log.isDebugEnabled()) {
- this.log.debug("[exchange: " + this.state.getId() + "] Request connection for " +
- this.state.getRoute());
- }
-
- discardConnection();
-
- this.state.setValidDuration(0);
- this.state.setNonReusable();
- this.state.setRouteEstablished(false);
- this.state.setRouteTracker(null);
-
- final HttpRoute route = this.state.getRoute();
- final Object userToken = this.localContext.getUserToken();
- final RequestConfig config = this.localContext.getRequestConfig();
- this.connmgr.requestConnection(
- route,
- userToken,
- config.getConnectTimeout(),
- config.getConnectionRequestTimeout(),
- TimeUnit.MILLISECONDS,
- new FutureCallback<NHttpClientConnection>() {
-
- @Override
- public void completed(final NHttpClientConnection managedConn) {
- connectionAllocated(managedConn);
- }
-
- @Override
- public void failed(final Exception ex) {
- connectionRequestFailed(ex);
- }
-
- @Override
- public void cancelled() {
- connectionRequestCancelled();
- }
-
- });
- }
-
- @Override
- public NHttpClientConnection getConnection() {
- return this.managedConn.get();
- }
-
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/HttpAsyncClientBuilder.java Fri Oct 10 19:49:15 2014
@@ -838,11 +838,8 @@ public class HttpAsyncClientBuilder {
}
final MainClientExec exec = new MainClientExec(
- connManager,
httpprocessor,
routePlanner,
- reuseStrategy,
- keepAliveStrategy,
redirectStrategy,
targetAuthStrategy,
proxyAuthStrategy,
@@ -862,6 +859,8 @@ public class HttpAsyncClientBuilder {
}
return new InternalHttpAsyncClient(
connManager,
+ reuseStrategy,
+ keepAliveStrategy,
threadFactory,
eventHandler,
exec,
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalClientExec.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalClientExec.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalClientExec.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalClientExec.java Fri Oct 10 19:49:15 2014
@@ -39,13 +39,14 @@ import org.apache.http.nio.IOControl;
interface InternalClientExec {
void prepare(
- InternalState state,
HttpHost target,
- HttpRequest original) throws IOException, HttpException;
+ HttpRequest original,
+ InternalState state,
+ AbstractClientExchangeHandler<?> handler) throws IOException, HttpException;
HttpRequest generateRequest(
InternalState state,
- InternalConnManager connManager) throws IOException, HttpException;
+ AbstractClientExchangeHandler<?> handler) throws IOException, HttpException;
void produceContent(
InternalState state,
@@ -53,11 +54,13 @@ interface InternalClientExec {
IOControl ioctrl) throws IOException;
void requestCompleted(
- InternalState state);
+ InternalState state,
+ AbstractClientExchangeHandler<?> handler);
void responseReceived(
+ HttpResponse response,
InternalState state,
- HttpResponse response) throws IOException, HttpException;
+ AbstractClientExchangeHandler<?> handler) throws IOException, HttpException;
void consumeContent(
InternalState state,
@@ -66,6 +69,6 @@ interface InternalClientExec {
void responseCompleted(
InternalState state,
- InternalConnManager connManager) throws IOException, HttpException;
+ AbstractClientExchangeHandler<?> handler) throws IOException, HttpException;
}
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalHttpAsyncClient.java Fri Oct 10 19:49:15 2014
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactor
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthState;
import org.apache.http.client.CookieStore;
@@ -40,6 +41,7 @@ import org.apache.http.client.protocol.H
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.config.Lookup;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.cookie.CookieSpecProvider;
import org.apache.http.nio.NHttpClientEventHandler;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
@@ -53,6 +55,8 @@ class InternalHttpAsyncClient extends Cl
private final Log log = LogFactory.getLog(getClass());
private final NHttpClientConnectionManager connmgr;
+ private final ConnectionReuseStrategy connReuseStrategy;
+ private final ConnectionKeepAliveStrategy keepaliveStrategy;
private final InternalClientExec exec;
private final Lookup<CookieSpecProvider> cookieSpecRegistry;
private final Lookup<AuthSchemeProvider> authSchemeRegistry;
@@ -62,6 +66,8 @@ class InternalHttpAsyncClient extends Cl
public InternalHttpAsyncClient(
final NHttpClientConnectionManager connmgr,
+ final ConnectionReuseStrategy connReuseStrategy,
+ final ConnectionKeepAliveStrategy keepaliveStrategy,
final ThreadFactory threadFactory,
final NHttpClientEventHandler handler,
final InternalClientExec exec,
@@ -72,6 +78,8 @@ class InternalHttpAsyncClient extends Cl
final RequestConfig defaultConfig) {
super(connmgr, threadFactory, handler);
this.connmgr = connmgr;
+ this.connReuseStrategy = connReuseStrategy;
+ this.keepaliveStrategy = keepaliveStrategy;
this.exec = exec;
this.cookieSpecRegistry = cookieSpecRegistry;
this.authSchemeRegistry = authSchemeRegistry;
@@ -124,6 +132,8 @@ class InternalHttpAsyncClient extends Cl
localcontext,
future,
this.connmgr,
+ this.connReuseStrategy,
+ this.keepaliveStrategy,
this.exec);
try {
handler.start();
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalState.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalState.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/InternalState.java Fri Oct 10 19:49:15 2014
@@ -27,36 +27,23 @@
package org.apache.http.impl.nio.client;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.conn.routing.RouteTracker;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
class InternalState {
- private static final AtomicLong COUNTER = new AtomicLong(1);
-
private final long id;
private final HttpAsyncRequestProducer requestProducer;
private final HttpAsyncResponseConsumer<?> responseConsumer;
private final HttpClientContext localContext;
- private boolean routeEstablished;
- private RouteTracker routeTracker;
- private boolean reusable;
- private long validDuration;
-
- private HttpRoute route;
private HttpRequestWrapper mainRequest;
private HttpResponse finalResponse;
- private HttpRequestWrapper currentRequest;
- private HttpResponse currentResponse;
private ByteBuffer tmpbuf;
private boolean requestContentProduced;
private int execCount;
@@ -65,11 +52,12 @@ class InternalState {
private HttpUriRequest redirect;
public InternalState(
+ final long id,
final HttpAsyncRequestProducer requestProducer,
final HttpAsyncResponseConsumer<?> responseConsumer,
final HttpClientContext localContext) {
super();
- this.id = COUNTER.getAndIncrement();
+ this.id = id;
this.requestProducer = requestProducer;
this.responseConsumer = responseConsumer;
this.localContext = localContext;
@@ -91,50 +79,6 @@ class InternalState {
return localContext;
}
- public boolean isRouteEstablished() {
- return routeEstablished;
- }
-
- public void setRouteEstablished(final boolean b) {
- this.routeEstablished = b;
- }
-
- public RouteTracker getRouteTracker() {
- return routeTracker;
- }
-
- public void setRouteTracker(final RouteTracker routeTracker) {
- this.routeTracker = routeTracker;
- }
-
- public boolean isReusable() {
- return reusable;
- }
-
- public void setReusable() {
- this.reusable = true;
- }
-
- public void setNonReusable() {
- this.reusable = false;
- }
-
- public long getValidDuration() {
- return validDuration;
- }
-
- public void setValidDuration(final long validDuration) {
- this.validDuration = validDuration;
- }
-
- public HttpRoute getRoute() {
- return route;
- }
-
- public void setRoute(final HttpRoute route) {
- this.route = route;
- }
-
public HttpRequestWrapper getMainRequest() {
return mainRequest;
}
@@ -151,22 +95,6 @@ class InternalState {
this.finalResponse = finalResponse;
}
- public HttpRequestWrapper getCurrentRequest() {
- return currentRequest;
- }
-
- public void setCurrentRequest(final HttpRequestWrapper currentRequest) {
- this.currentRequest = currentRequest;
- }
-
- public HttpResponse getCurrentResponse() {
- return currentResponse;
- }
-
- public void setCurrentResponse(final HttpResponse currentResponse) {
- this.currentResponse = currentResponse;
- }
-
public ByteBuffer getTmpbuf() {
if (tmpbuf == null) {
tmpbuf = ByteBuffer.allocate(4 * 1024);
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MainClientExec.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MainClientExec.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MainClientExec.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MainClientExec.java Fri Oct 10 19:49:15 2014
@@ -31,11 +31,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
@@ -62,19 +60,16 @@ import org.apache.http.client.methods.Ht
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.protocol.RequestClientConnControl;
import org.apache.http.client.utils.URIUtils;
-import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.routing.BasicRouteDirector;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.HttpRouteDirector;
import org.apache.http.conn.routing.HttpRoutePlanner;
-import org.apache.http.conn.routing.RouteTracker;
import org.apache.http.impl.auth.HttpAuthenticator;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
-import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpCoreContext;
@@ -86,12 +81,9 @@ class MainClientExec implements Internal
private final Log log = LogFactory.getLog(getClass());
- private final NHttpClientConnectionManager connmgr;
private final HttpProcessor httpProcessor;
private final HttpProcessor proxyHttpProcessor;
private final HttpRoutePlanner routePlanner;
- private final ConnectionReuseStrategy connReuseStrategy;
- private final ConnectionKeepAliveStrategy keepaliveStrategy;
private final AuthenticationStrategy targetAuthStrategy;
private final AuthenticationStrategy proxyAuthStrategy;
private final UserTokenHandler userTokenHandler;
@@ -100,23 +92,17 @@ class MainClientExec implements Internal
private final HttpAuthenticator authenticator;
public MainClientExec(
- final NHttpClientConnectionManager connmgr,
final HttpProcessor httpProcessor,
final HttpRoutePlanner routePlanner,
- final ConnectionReuseStrategy connReuseStrategy,
- final ConnectionKeepAliveStrategy keepaliveStrategy,
final RedirectStrategy redirectStrategy,
final AuthenticationStrategy targetAuthStrategy,
final AuthenticationStrategy proxyAuthStrategy,
final UserTokenHandler userTokenHandler) {
super();
- this.connmgr = connmgr;
this.httpProcessor = httpProcessor;
this.proxyHttpProcessor = new ImmutableHttpProcessor(
new RequestTargetHost(), new RequestClientConnControl());
this.routePlanner = routePlanner;
- this.connReuseStrategy = connReuseStrategy;
- this.keepaliveStrategy = keepaliveStrategy;
this.redirectStrategy = redirectStrategy;
this.targetAuthStrategy = targetAuthStrategy;
this.proxyAuthStrategy = proxyAuthStrategy;
@@ -127,9 +113,10 @@ class MainClientExec implements Internal
@Override
public void prepare(
- final InternalState state,
final HttpHost target,
- final HttpRequest original) throws HttpException, IOException {
+ final HttpRequest original,
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) throws HttpException, IOException {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + state.getId() + "] start execution");
}
@@ -150,67 +137,54 @@ class MainClientExec implements Internal
final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
final HttpRoute route = this.routePlanner.determineRoute(target, request, localContext);
- state.setRoute(route);
+
+ handler.setRoute(route);
+
state.setMainRequest(request);
- state.setCurrentRequest(request);
+ handler.setCurrentRequest(request);
- prepareRequest(state);
+ prepareRequest(state, handler);
}
@Override
public HttpRequest generateRequest(
final InternalState state,
- final InternalConnManager connManager) throws IOException, HttpException {
- final HttpClientContext localContext = state.getLocalContext();
- final HttpRoute route = state.getRoute();
- final NHttpClientConnection managedConn = connManager.getConnection();
- if (!state.isRouteEstablished() && state.getRouteTracker() == null) {
- state.setRouteEstablished(this.connmgr.isRouteComplete(managedConn));
- if (!state.isRouteEstablished()) {
- this.log.debug("Start connection routing");
- state.setRouteTracker(new RouteTracker(route));
- } else {
- this.log.debug("Connection route already established");
- }
- }
+ final AbstractClientExchangeHandler<?> handler) throws IOException, HttpException {
+
+ final HttpRoute route = handler.getRoute();
+
+ handler.verifytRoute();
- if (!state.isRouteEstablished()) {
- final RouteTracker routeTracker = state.getRouteTracker();
+ if (!handler.isRouteEstablished()) {
int step;
loop:
do {
- final HttpRoute fact = routeTracker.toRoute();
+ final HttpRoute fact = handler.getActualRoute();
step = this.routeDirector.nextStep(route, fact);
switch (step) {
case HttpRouteDirector.CONNECT_TARGET:
- this.connmgr.startRoute(managedConn, route, localContext);
- routeTracker.connectTarget(route.isSecure());
+ handler.onRouteToTarget();
break;
case HttpRouteDirector.CONNECT_PROXY:
- this.connmgr.startRoute(managedConn, route, localContext);
- final HttpHost proxy = route.getProxyHost();
- routeTracker.connectProxy(proxy, false);
+ handler.onRouteToProxy();
break;
case HttpRouteDirector.TUNNEL_TARGET:
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + state.getId() + "] Tunnel required");
}
final HttpRequest connect = createConnectRequest(route, state);
- state.setCurrentRequest(HttpRequestWrapper.wrap(connect));
+ handler.setCurrentRequest(HttpRequestWrapper.wrap(connect));
break loop;
case HttpRouteDirector.TUNNEL_PROXY:
throw new HttpException("Proxy chains are not supported");
case HttpRouteDirector.LAYER_PROTOCOL:
- this.connmgr.upgrade(managedConn, route, localContext);
- routeTracker.layerProtocol(route.isSecure());
+ handler.onRouteUpgrade();
break;
case HttpRouteDirector.UNREACHABLE:
throw new HttpException("Unable to establish route: " +
"planned = " + route + "; current = " + fact);
case HttpRouteDirector.COMPLETE:
- this.connmgr.routeComplete(managedConn, route, localContext);
- state.setRouteEstablished(true);
- state.setRouteTracker(null);
+ handler.onRouteComplete();
this.log.debug("Connection route established");
break;
default:
@@ -220,13 +194,14 @@ class MainClientExec implements Internal
} while (step > HttpRouteDirector.COMPLETE);
}
- HttpRequestWrapper currentRequest = state.getCurrentRequest();
+ final HttpClientContext localContext = state.getLocalContext();
+ HttpRequestWrapper currentRequest = handler.getCurrentRequest();
if (currentRequest == null) {
currentRequest = state.getMainRequest();
- state.setCurrentRequest(currentRequest);
+ handler.setCurrentRequest(currentRequest);
}
- if (state.isRouteEstablished()) {
+ if (handler.isRouteEstablished()) {
state.incrementExecCount();
if (state.getExecCount() > 1) {
final HttpAsyncRequestProducer requestProducer = state.getRequestProducer();
@@ -265,6 +240,7 @@ class MainClientExec implements Internal
}
}
+ final NHttpClientConnection managedConn = handler.getConnection();
localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
final RequestConfig config = localContext.getRequestConfig();
if (config.getSocketTimeout() > 0) {
@@ -290,7 +266,9 @@ class MainClientExec implements Internal
}
@Override
- public void requestCompleted(final InternalState state) {
+ public void requestCompleted(
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + state.getId() + "] Request completed");
}
@@ -301,8 +279,9 @@ class MainClientExec implements Internal
@Override
public void responseReceived(
+ final HttpResponse response,
final InternalState state,
- final HttpResponse response) throws IOException, HttpException {
+ final AbstractClientExchangeHandler<?> handler) throws IOException, HttpException {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + state.getId() + "] Response received " + response.getStatusLine());
}
@@ -310,25 +289,24 @@ class MainClientExec implements Internal
context.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
this.httpProcessor.process(response, context);
- state.setCurrentResponse(response);
+ handler.setCurrentResponse(response);
- if (!state.isRouteEstablished()) {
+ if (!handler.isRouteEstablished()) {
final int status = response.getStatusLine().getStatusCode();
if (status < 200) {
throw new HttpException("Unexpected response to CONNECT request: " +
response.getStatusLine());
}
if (status == HttpStatus.SC_OK) {
- final RouteTracker routeTracker = state.getRouteTracker();
- routeTracker.tunnelTarget(false);
- state.setCurrentRequest(null);
+ handler.onRouteTunnelToTarget();
+ handler.setCurrentRequest(null);
} else {
- if (!handleConnectResponse(state)) {
+ if (!handleConnectResponse(state, handler)) {
state.setFinalResponse(response);
}
}
} else {
- if (!handleResponse(state)) {
+ if (!handleResponse(state, handler)) {
state.setFinalResponse(response);
}
}
@@ -359,41 +337,21 @@ class MainClientExec implements Internal
@Override
public void responseCompleted(
final InternalState state,
- final InternalConnManager connManager) throws IOException, HttpException {
+ final AbstractClientExchangeHandler<?> handler) throws IOException, HttpException {
final HttpClientContext localContext = state.getLocalContext();
- final HttpResponse currentResponse = state.getCurrentResponse();
+ final HttpResponse currentResponse = handler.getCurrentResponse();
- if (!state.isRouteEstablished()) {
+ if (!handler.isRouteEstablished()) {
final int status = currentResponse.getStatusLine().getStatusCode();
if (status == HttpStatus.SC_OK) {
- state.setCurrentResponse(null);
+ handler.setCurrentResponse(null);
return;
}
}
- final NHttpClientConnection managedConn = connManager.getConnection();
- if (managedConn.isOpen() && this.connReuseStrategy.keepAlive(currentResponse, localContext)) {
- final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
- currentResponse, localContext);
- if (this.log.isDebugEnabled()) {
- final String s;
- if (validDuration > 0) {
- s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
- } else {
- s = "indefinitely";
- }
- this.log.debug("[exchange: " + state.getId() + "] Connection can be kept alive " + s);
- }
- state.setValidDuration(validDuration);
- state.setReusable();
- } else {
- if (this.log.isDebugEnabled()) {
- if (managedConn.isOpen()) {
- this.log.debug("[exchange: " + state.getId() + "] Connection cannot be kept alive");
- }
- }
- state.setNonReusable();
- connManager.releaseConnection();
+ final boolean keepAlive = handler.manageConnectionPersistence();
+ if (!keepAlive) {
+ handler.releaseConnection();
final AuthState proxyAuthState = localContext.getProxyAuthState();
if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
&& proxyAuthState.getAuthScheme() != null
@@ -426,7 +384,7 @@ class MainClientExec implements Internal
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + state.getId() + "] Response processed");
}
- connManager.releaseConnection();
+ handler.releaseConnection();
} else {
if (state.getRedirect() != null) {
final HttpUriRequest redirect = state.getRedirect();
@@ -442,7 +400,7 @@ class MainClientExec implements Internal
}
// Reset auth states if redirecting to another host
- final HttpRoute route = state.getRoute();
+ final HttpRoute route = handler.getRoute();
if (!route.getTargetHost().equals(newTarget)) {
final AuthState targetAuthState = localContext.getTargetAuthState();
if (this.log.isDebugEnabled()) {
@@ -467,21 +425,21 @@ class MainClientExec implements Internal
final HttpRequestWrapper newRequest = HttpRequestWrapper.wrap(redirect);
final HttpRoute newRoute = this.routePlanner.determineRoute(
newTarget, newRequest, localContext);
- state.setRoute(newRoute);
- state.setMainRequest(newRequest);
- state.setCurrentRequest(newRequest);
if (!route.equals(newRoute)) {
- connManager.releaseConnection();
+ handler.releaseConnection();
}
- prepareRequest(state);
+ handler.setRoute(newRoute);
+ handler.setCurrentRequest(newRequest);
+ state.setMainRequest(newRequest);
+ prepareRequest(state, handler);
}
}
- state.setCurrentResponse(null);
+ handler.setCurrentResponse(null);
}
- private void rewriteRequestURI(final InternalState state) throws ProtocolException {
- final HttpRequestWrapper request = state.getCurrentRequest();
- final HttpRoute route = state.getRoute();
+ private void rewriteRequestURI(
+ final HttpRequestWrapper request,
+ final HttpRoute route) throws ProtocolException {
try {
URI uri = request.getURI();
if (uri != null) {
@@ -509,10 +467,12 @@ class MainClientExec implements Internal
}
}
- private void prepareRequest(final InternalState state) throws IOException, HttpException {
+ private void prepareRequest(
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) throws IOException, HttpException {
final HttpClientContext localContext = state.getLocalContext();
- final HttpRequestWrapper currentRequest = state.getCurrentRequest();
- final HttpRoute route = state.getRoute();
+ final HttpRequestWrapper currentRequest = handler.getCurrentRequest();
+ final HttpRoute route = handler.getRoute();
final HttpRequest original = currentRequest.getOriginal();
URI uri = null;
@@ -533,7 +493,7 @@ class MainClientExec implements Internal
currentRequest.setURI(uri);
// Re-write request URI if needed
- rewriteRequestURI(state);
+ rewriteRequestURI(currentRequest, route);
HttpHost target = null;
if (uri != null && uri.isAbsolute() && uri.getHost() != null) {
@@ -578,15 +538,17 @@ class MainClientExec implements Internal
return request;
}
- private boolean handleConnectResponse(final InternalState state) throws HttpException {
+ private boolean handleConnectResponse(
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) throws HttpException {
final HttpClientContext localContext = state.getLocalContext();
final RequestConfig config = localContext.getRequestConfig();
if (config.isAuthenticationEnabled()) {
final CredentialsProvider credsProvider = localContext.getCredentialsProvider();
if (credsProvider != null) {
- final HttpRoute route = state.getRoute();
+ final HttpRoute route = handler.getRoute();
final HttpHost proxy = route.getProxyHost();
- final HttpResponse currentResponse = state.getCurrentResponse();
+ final HttpResponse currentResponse = handler.getCurrentResponse();
final AuthState proxyAuthState = localContext.getProxyAuthState();
if (this.authenticator.isAuthenticationRequested(proxy, currentResponse,
this.proxyAuthStrategy, proxyAuthState, localContext)) {
@@ -598,13 +560,15 @@ class MainClientExec implements Internal
return false;
}
- private boolean handleResponse(final InternalState state) throws HttpException {
+ private boolean handleResponse(
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) throws HttpException {
final HttpClientContext localContext = state.getLocalContext();
final RequestConfig config = localContext.getRequestConfig();
if (config.isAuthenticationEnabled()) {
- if (needAuthentication(state)) {
+ if (needAuthentication(state, handler)) {
// discard previous auth headers
- final HttpRequestWrapper currentRequest = state.getCurrentRequest();
+ final HttpRequestWrapper currentRequest = handler.getCurrentRequest();
final HttpRequest original = currentRequest.getOriginal();
if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
currentRequest.removeHeaders(AUTH.WWW_AUTH_RESP);
@@ -616,8 +580,8 @@ class MainClientExec implements Internal
}
}
if (config.isRedirectsEnabled()) {
- final HttpRequest currentRequest = state.getCurrentRequest();
- final HttpResponse currentResponse = state.getCurrentResponse();
+ final HttpRequest currentRequest = handler.getCurrentRequest();
+ final HttpResponse currentResponse = handler.getCurrentResponse();
if (this.redirectStrategy.isRedirected(currentRequest, currentResponse, localContext)) {
final int maxRedirects = config.getMaxRedirects() >= 0 ? config.getMaxRedirects() : 100;
if (state.getRedirectCount() >= maxRedirects) {
@@ -633,12 +597,14 @@ class MainClientExec implements Internal
return false;
}
- private boolean needAuthentication(final InternalState state) throws HttpException {
+ private boolean needAuthentication(
+ final InternalState state,
+ final AbstractClientExchangeHandler<?> handler) throws HttpException {
final HttpClientContext localContext = state.getLocalContext();
final CredentialsProvider credsProvider = localContext.getCredentialsProvider();
if (credsProvider != null) {
- final HttpRoute route = state.getRoute();
- final HttpResponse currentResponse = state.getCurrentResponse();
+ final HttpRoute route = handler.getRoute();
+ final HttpResponse currentResponse = handler.getCurrentResponse();
HttpHost target = localContext.getTargetHost();
if (target == null) {
target = route.getTargetHost();
Added: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java?rev=1630966&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java (added)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java Fri Oct 10 19:49:15 2014
@@ -0,0 +1,256 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.impl.nio.client;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpException;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.Configurable;
+import org.apache.http.client.methods.HttpExecutionAware;
+import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.concurrent.BasicFuture;
+import org.apache.http.conn.ConnectionKeepAliveStrategy;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+
+/**
+ * Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}.
+ * <p>
+ * Instances of this class are expected to be accessed by one thread at a time only.
+ * The {@link #cancel()} method can be called concurrently by multiple threads.
+ */
+class MinimalClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
+
+ private final HttpAsyncRequestProducer requestProducer;
+ private final HttpAsyncResponseConsumer<T> responseConsumer;
+ private final HttpClientContext localContext;
+ private final BasicFuture<T> resultFuture;
+ private final HttpProcessor httpProcessor;
+
+ public MinimalClientExchangeHandlerImpl(
+ final Log log,
+ final HttpAsyncRequestProducer requestProducer,
+ final HttpAsyncResponseConsumer<T> responseConsumer,
+ final HttpClientContext localContext,
+ final BasicFuture<T> resultFuture,
+ final NHttpClientConnectionManager connmgr,
+ final HttpProcessor httpProcessor,
+ final ConnectionReuseStrategy connReuseStrategy,
+ final ConnectionKeepAliveStrategy keepaliveStrategy) {
+ super(log, localContext, resultFuture, connmgr, connReuseStrategy, keepaliveStrategy);
+ this.requestProducer = requestProducer;
+ this.responseConsumer = responseConsumer;
+ this.localContext = localContext;
+ this.resultFuture = resultFuture;
+ this.httpProcessor = httpProcessor;
+ }
+
+ @Override
+ void releaseResources() {
+ try {
+ this.requestProducer.close();
+ } catch (final IOException ex) {
+ this.log.debug("I/O error closing request producer", ex);
+ }
+ try {
+ this.responseConsumer.close();
+ } catch (final IOException ex) {
+ this.log.debug("I/O error closing response consumer", ex);
+ }
+ }
+
+ @Override
+ void executionFailed(final Exception ex) {
+ this.requestProducer.failed(ex);
+ this.responseConsumer.failed(ex);
+ }
+
+ @Override
+ boolean executionCancelled() {
+ final boolean cancelled = this.responseConsumer.cancel();
+
+ final T result = this.responseConsumer.getResult();
+ final Exception ex = this.responseConsumer.getException();
+ if (ex != null) {
+ this.resultFuture.failed(ex);
+ } else if (result != null) {
+ this.resultFuture.completed(result);
+ } else {
+ this.resultFuture.cancel();
+ }
+ return cancelled;
+ }
+
+ public void start() throws HttpException, IOException {
+ final HttpHost target = this.requestProducer.getTarget();
+ final HttpRequest original = this.requestProducer.generateRequest();
+
+ if (original instanceof HttpExecutionAware) {
+ ((HttpExecutionAware) original).setCancellable(this);
+ }
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] start execution");
+ }
+
+ if (original instanceof Configurable) {
+ final RequestConfig config = ((Configurable) original).getConfig();
+ if (config != null) {
+ this.localContext.setRequestConfig(config);
+ }
+ }
+
+ final HttpRequestWrapper request = HttpRequestWrapper.wrap(original);
+ final HttpRoute route = new HttpRoute(target);
+ setCurrentRequest(request);
+ setRoute(route);
+
+ this.localContext.setAttribute(HttpClientContext.HTTP_REQUEST, request);
+ this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
+ this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
+
+ this.httpProcessor.process(request, this.localContext);
+
+ requestConnection();
+ }
+
+ @Override
+ public HttpRequest generateRequest() throws IOException, HttpException {
+ verifytRoute();
+ if (!isRouteEstablished()) {
+ onRouteToTarget();
+ onRouteComplete();
+ }
+
+ final NHttpClientConnection localConn = getConnection();
+ this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, localConn);
+ final RequestConfig config = this.localContext.getRequestConfig();
+ if (config.getSocketTimeout() > 0) {
+ localConn.setSocketTimeout(config.getSocketTimeout());
+ }
+ return getCurrentRequest();
+ }
+
+ @Override
+ public void produceContent(
+ final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] produce content");
+ }
+ this.requestProducer.produceContent(encoder, ioctrl);
+ if (encoder.isCompleted()) {
+ this.requestProducer.resetRequest();
+ }
+ }
+
+ @Override
+ public void requestCompleted() {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] Request completed");
+ }
+ this.requestProducer.requestCompleted(this.localContext);
+ }
+
+ @Override
+ public void responseReceived(
+ final HttpResponse response) throws IOException, HttpException {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
+ }
+ this.localContext.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
+ this.httpProcessor.process(response, this.localContext);
+
+ setCurrentResponse(response);
+
+ this.responseConsumer.responseReceived(response);
+ }
+
+ @Override
+ public void consumeContent(
+ final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] Consume content");
+ }
+ this.responseConsumer.consumeContent(decoder, ioctrl);
+ if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
+ markConnectionNonReusable();
+ try {
+ markCompleted();
+ releaseConnection();
+ this.resultFuture.cancel();
+ } finally {
+ close();
+ }
+ }
+ }
+
+ @Override
+ public void responseCompleted() throws IOException, HttpException {
+ manageConnectionPersistence();
+ this.responseConsumer.responseCompleted(this.localContext);
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("[exchange: " + getId() + "] Response processed");
+ }
+ try {
+ markCompleted();
+ releaseConnection();
+ final T result = this.responseConsumer.getResult();
+ final Exception ex = this.responseConsumer.getException();
+ if (ex == null) {
+ this.resultFuture.completed(result);
+ } else {
+ this.resultFuture.failed(ex);
+ }
+ } finally {
+ close();
+ }
+ }
+
+ @Override
+ public void inputTerminated() {
+ close();
+ }
+
+ public void abortConnection() {
+ discardConnection();
+ }
+
+}
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalClientExchangeHandlerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java?rev=1630966&r1=1630965&r2=1630966&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/httpasyncclient/src/main/java/org/apache/http/impl/nio/client/MinimalHttpAsyncClient.java Fri Oct 10 19:49:15 2014
@@ -56,7 +56,7 @@ class MinimalHttpAsyncClient extends Clo
private final Log log = LogFactory.getLog(getClass());
private final NHttpClientConnectionManager connmgr;
- private final InternalClientExec execChain;
+ private final HttpProcessor httpProcessor;
public MinimalHttpAsyncClient(
final NHttpClientConnectionManager connmgr,
@@ -64,16 +64,11 @@ class MinimalHttpAsyncClient extends Clo
final NHttpClientEventHandler eventHandler) {
super(connmgr, threadFactory, eventHandler);
this.connmgr = connmgr;
- final HttpProcessor httpProcessor = new ImmutableHttpProcessor(new RequestContent(),
+ this.httpProcessor = new ImmutableHttpProcessor(new RequestContent(),
new RequestTargetHost(),
new RequestClientConnControl(),
new RequestUserAgent(VersionInfo.getUserAgent(
"Apache-HttpAsyncClient", "org.apache.http.nio.client", getClass())));
- this.execChain = new MinimalClientExec(
- connmgr,
- httpProcessor,
- DefaultConnectionReuseStrategy.INSTANCE,
- DefaultConnectionKeepAliveStrategy.INSTANCE);
}
public MinimalHttpAsyncClient(
@@ -93,14 +88,16 @@ class MinimalHttpAsyncClient extends Clo
context != null ? context : new BasicHttpContext());
@SuppressWarnings("resource")
- final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>(
+ final MinimalClientExchangeHandlerImpl<T> handler = new MinimalClientExchangeHandlerImpl<T>(
this.log,
requestProducer,
responseConsumer,
localcontext,
future,
this.connmgr,
- this.execChain);
+ this.httpProcessor,
+ DefaultConnectionReuseStrategy.INSTANCE,
+ DefaultConnectionKeepAliveStrategy.INSTANCE);
try {
handler.start();
} catch (final Exception ex) {