You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2010/11/25 15:18:43 UTC
svn commit: r1039053 - in /httpcomponents/httpasyncclient/trunk/src:
examples/org/apache/http/examples/nio/client/
main/java/org/apache/http/impl/nio/client/
main/java/org/apache/http/nio/client/
test/java/org/apache/http/impl/nio/client/
Author: olegk
Date: Thu Nov 25 14:18:42 2010
New Revision: 1039053
URL: http://svn.apache.org/viewvc?rev=1039053&view=rev
Log:
HttpAsyncClient API redesign
Added:
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java (with props)
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java (with props)
Removed:
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/InternalRequestExecutionHandler.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpExchange.java
Modified:
httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java
httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java
Modified: httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java Thu Nov 25 14:18:42 2010
@@ -26,6 +26,10 @@
*/
package org.apache.http.examples.nio.client;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.impl.nio.client.BasicAsyncHttpClient;
@@ -33,7 +37,6 @@ import org.apache.http.impl.nio.conn.Bas
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
@@ -55,31 +58,29 @@ public class AsyncClientRequest {
sessmrg.setDefaultMaxPerHost(3);
AsyncHttpClient asynchttpclient = new BasicAsyncHttpClient(
- ioReactor,
- sessmrg,
+ ioReactor,
+ sessmrg,
params);
-
+
asynchttpclient.start();
-
- HttpHost target = new HttpHost("www.apache.org", 80);
- BasicHttpRequest request = new BasicHttpRequest("GET", "/");
-
- HttpExchange[] httpx = new HttpExchange[10];
- for (int i = 0; i < httpx.length; i++) {
- httpx[i] = asynchttpclient.execute(target, request);
- }
-
- for (int i = 0; i < httpx.length; i++) {
- HttpResponse response = httpx[i].awaitResponse();
- if (response != null) {
+ try {
+ HttpHost target = new HttpHost("www.apache.org", 80);
+ Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
+ for (int i = 0; i < 10; i++) {
+ BasicHttpRequest request = new BasicHttpRequest("GET", "/");
+ queue.add(asynchttpclient.execute(target, request, null));
+ }
+ while (!queue.isEmpty()) {
+ Future<HttpResponse> future = queue.remove();
+ HttpResponse response = future.get();
System.out.println("Response: " + response.getStatusLine());
+
}
- }
- System.out.println("Shutting down");
-
- asynchttpclient.shutdown();
-
+ System.out.println("Shutting down");
+ } finally {
+ asynchttpclient.shutdown();
+ }
System.out.println("Done");
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java Thu Nov 25 14:18:42 2010
@@ -27,6 +27,7 @@
package org.apache.http.impl.nio.client;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,12 +35,14 @@ import org.apache.http.ConnectionReuseSt
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.HttpResponse;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.conn.BasicIOSessionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
+import org.apache.http.nio.concurrent.FutureCallback;
import org.apache.http.nio.conn.IOSessionManager;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
@@ -127,7 +130,6 @@ public class BasicAsyncHttpClient implem
private void doExecute() {
NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler(
createHttpProcessor(),
- new InternalRequestExecutionHandler(),
createConnectionReuseStrategy(),
this.params);
IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler, this.params);
@@ -166,9 +168,18 @@ public class BasicAsyncHttpClient implem
}
}
- public HttpExchange execute(final HttpHost target, final HttpRequest request) {
- HttpRoute route = new HttpRoute(target);
- return new HttpExchangeImpl(request, route, null, this.sessmrg);
+ public <T> Future<T> execute(
+ final HttpAsyncExchangeHandler<T> handler, final FutureCallback<T> callback) {
+ HttpRoute route = new HttpRoute(handler.getTarget());
+ HttpExchangeImpl<T> httpexchange = new HttpExchangeImpl<T>(
+ route, null, this.sessmrg, handler, callback);
+ return httpexchange.getResultFuture();
+ }
+
+ public Future<HttpResponse> execute(
+ final HttpHost target, final HttpRequest request, final FutureCallback<HttpResponse> callback) {
+ BasicHttpAsyncExchangeHandler handler = new BasicHttpAsyncExchangeHandler(target, request);
+ return execute(handler, callback);
}
}
Added: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java?rev=1039053&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java (added)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java Thu Nov 25 14:18:42 2010
@@ -0,0 +1,178 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.impl.nio.client;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
+import org.apache.http.nio.entity.BufferingNHttpEntity;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.entity.NHttpEntityWrapper;
+import org.apache.http.nio.entity.ProducingNHttpEntity;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
+
+public class BasicHttpAsyncExchangeHandler implements HttpAsyncExchangeHandler<HttpResponse> {
+
+ private final HttpHost target;
+ private final HttpRequest request;
+
+ private HttpResponse response;
+ private ProducingNHttpEntity contentProducingEntity;
+ private ConsumingNHttpEntity contentConsumingEntity;
+
+ public BasicHttpAsyncExchangeHandler(
+ final HttpHost target, final HttpRequest request) {
+ super();
+ if (target == null) {
+ throw new IllegalArgumentException("Target host may not be null");
+ }
+ if (request == null) {
+ throw new IllegalArgumentException("HTTP request may not be null");
+ }
+ this.target = target;
+ this.request = request;
+ }
+
+ public HttpRequest getRequest() {
+ return this.request;
+ }
+
+ public HttpHost getTarget() {
+ return this.target;
+ }
+
+ protected ConsumingNHttpEntity createConsumingHttpEntity(
+ final HttpResponse response) throws IOException {
+ if (response.getEntity() != null) {
+ return new BufferingNHttpEntity(response.getEntity(), new HeapByteBufferAllocator());
+ } else {
+ return null;
+ }
+ }
+
+ protected ProducingNHttpEntity createProducingHttpEntity(
+ final HttpRequest request) throws IOException {
+ HttpEntityEnclosingRequest entityReq;
+ HttpEntity entity = null;
+ if (request instanceof HttpEntityEnclosingRequest) {
+ entityReq = (HttpEntityEnclosingRequest) request;
+ entity = entityReq.getEntity();
+ }
+ if (entity != null) {
+ if (entity instanceof ProducingNHttpEntity) {
+ return (ProducingNHttpEntity) entity;
+ } else {
+ return new NHttpEntityWrapper(entity);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ private ConsumingNHttpEntity getConsumingHttpEntity() throws IOException {
+ if (this.contentConsumingEntity == null) {
+ this.contentConsumingEntity = createConsumingHttpEntity(this.response);
+ if (this.contentConsumingEntity == null) {
+ throw new IllegalStateException("Content consumer is null");
+ }
+ }
+ return this.contentConsumingEntity;
+ }
+
+ private ProducingNHttpEntity getProducingHttpEntity() throws IOException {
+ if (this.contentProducingEntity == null) {
+ this.contentProducingEntity = createProducingHttpEntity(this.request);
+ if (this.contentProducingEntity == null) {
+ throw new IllegalStateException("Content producer is null");
+ }
+ }
+ return this.contentProducingEntity;
+ }
+
+ public void produceContent(
+ final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+ ProducingNHttpEntity producer = getProducingHttpEntity();
+ producer.produceContent(encoder, ioctrl);
+ }
+
+ public void responseReceived(final HttpResponse response) {
+ if (this.response != null) {
+ throw new IllegalStateException("HTTP response already set");
+ }
+ this.response = response;
+ }
+
+ public void consumeContent(
+ final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+ ConsumingNHttpEntity consumer = getConsumingHttpEntity();
+ consumer.consumeContent(decoder, ioctrl);
+ }
+
+ private void shutdown() {
+ if (this.contentProducingEntity != null) {
+ try {
+ this.contentProducingEntity.finish();
+ } catch (IOException ex) {
+ }
+ }
+ if (this.contentConsumingEntity != null) {
+ try {
+ this.contentConsumingEntity.finish();
+ } catch (IOException ex) {
+ }
+ }
+ this.response = null;
+ }
+
+
+ public void cancelled() {
+ shutdown();
+ }
+
+ public void failed(final Exception ex) {
+ shutdown();
+ }
+
+ public HttpResponse completed() {
+ if (this.response == null) {
+ throw new IllegalStateException("HTTP response is null");
+ }
+ HttpResponse response = this.response;
+ response.setEntity(this.contentConsumingEntity);
+ shutdown();
+ return response;
+ }
+
+}
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java Thu Nov 25 14:18:42 2010
@@ -26,12 +26,8 @@
*/
package org.apache.http.impl.nio.client;
-import java.io.IOException;
-
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
-import org.apache.http.nio.entity.ConsumingNHttpEntity;
-import org.apache.http.nio.entity.ProducingNHttpEntity;
class ConnState {
@@ -39,8 +35,7 @@ class ConnState {
private MessageState responseState;
private HttpRequest request;
private HttpResponse response;
- private ConsumingNHttpEntity consumingEntity;
- private ProducingNHttpEntity producingEntity;
+ private HttpExchangeImpl<?> httpexchange;
private boolean valid;
private int timeout;
@@ -83,20 +78,12 @@ class ConnState {
this.response = response;
}
- public void setConsumingEntity(final ConsumingNHttpEntity consumingEntity) {
- this.consumingEntity = consumingEntity;
- }
-
- public void setProducingEntity(final ProducingNHttpEntity producingEntity) {
- this.producingEntity = producingEntity;
+ public void setHttpExchange(final HttpExchangeImpl<?> httpexchange) {
+ this.httpexchange = httpexchange;
}
- public ProducingNHttpEntity getProducingEntity() {
- return producingEntity;
- }
-
- public ConsumingNHttpEntity getConsumingEntity() {
- return consumingEntity;
+ public HttpExchangeImpl<?> getHttpExchange() {
+ return this.httpexchange;
}
public int getTimeout() {
@@ -107,27 +94,20 @@ class ConnState {
this.timeout = timeout;
}
- public void resetInput() throws IOException {
+ public void resetInput() {
this.response = null;
- if (this.consumingEntity != null) {
- this.consumingEntity.finish();
- this.consumingEntity = null;
- }
this.responseState = MessageState.READY;
}
- public void resetOutput() throws IOException {
+ public void resetOutput() {
this.request = null;
- if (this.producingEntity != null) {
- this.producingEntity.finish();
- this.producingEntity = null;
- }
this.requestState = MessageState.READY;
}
- public void reset() throws IOException {
+ public void reset() {
resetInput();
resetOutput();
+ this.httpexchange = null;
}
public boolean isValid() {
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java Thu Nov 25 14:18:42 2010
@@ -27,92 +27,123 @@
package org.apache.http.impl.nio.client;
import java.nio.channels.SelectionKey;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import org.apache.http.HttpRequest;
-import org.apache.http.HttpResponse;
import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.nio.client.HttpExchange;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
import org.apache.http.nio.concurrent.BasicFuture;
import org.apache.http.nio.concurrent.FutureCallback;
import org.apache.http.nio.conn.IOSessionManager;
import org.apache.http.nio.conn.ManagedIOSession;
import org.apache.http.nio.reactor.IOSession;
-class HttpExchangeImpl implements HttpExchange {
+class HttpExchangeImpl<T> {
+
+ public static final String HTTP_EXCHANGE = "http.nio.http-exchange";
- private final HttpRequest request;
private final Future<ManagedIOSession> sessionFuture;
- private final BasicFuture<HttpResponse> responseFuture;
+ private final BasicFuture<T> resultFuture;
+ private final HttpAsyncExchangeHandler<T> handler;
private ManagedIOSession managedSession;
public HttpExchangeImpl(
- final HttpRequest request,
final HttpRoute route,
final Object state,
- final IOSessionManager<HttpRoute> sessmrg) {
+ final IOSessionManager<HttpRoute> sessmrg,
+ final HttpAsyncExchangeHandler<T> handler,
+ final FutureCallback<T> callback) {
super();
- this.request = request;
- this.responseFuture = new BasicFuture<HttpResponse>(null);
this.sessionFuture = sessmrg.leaseSession(route, state, new InternalFutureCallback());
+ this.resultFuture = new BasicFuture<T>(callback);
+ this.handler = handler;
}
- public boolean isCompleted() {
- return this.responseFuture.isDone();
+ public HttpAsyncExchangeHandler<T> getHandler() {
+ return this.handler;
}
- public HttpRequest getRequest() {
- return this.request;
+ public Future<T> getResultFuture() {
+ return this.resultFuture;
}
- public HttpResponse awaitResponse() throws ExecutionException, InterruptedException {
- return this.responseFuture.get();
+ public synchronized void completed() {
+ try {
+ if (this.managedSession != null) {
+ this.managedSession.releaseSession();
+ }
+ this.managedSession = null;
+ T result = this.handler.completed();
+ this.resultFuture.completed(result);
+ } catch (RuntimeException runex) {
+ this.resultFuture.failed(runex);
+ throw runex;
+ }
}
- public synchronized void completed(final HttpResponse response) {
- if (this.managedSession != null) {
- this.managedSession.releaseSession();
+ public synchronized void shutdown() {
+ try {
+ this.sessionFuture.cancel(true);
+ if (this.managedSession != null) {
+ this.managedSession.abortSession();
+ }
+ this.managedSession = null;
+ this.handler.cancelled();
+ } catch (RuntimeException runex) {
+ this.resultFuture.failed(runex);
+ throw runex;
}
- this.responseFuture.completed(response);
}
- public synchronized void cancel() {
- this.sessionFuture.cancel(true);
- if (this.managedSession != null) {
- this.managedSession.abortSession();
+ public synchronized void failed(final Exception ex) {
+ try {
+ this.sessionFuture.cancel(true);
+ if (this.managedSession != null) {
+ this.managedSession.abortSession();
+ }
+ this.managedSession = null;
+ this.handler.failed(ex);
+ } catch (RuntimeException runex) {
+ this.resultFuture.failed(ex);
+ throw runex;
}
- this.responseFuture.cancel(true);
}
- private synchronized void requestCompleted(final ManagedIOSession session) {
+ private synchronized void sessionRequestCompleted(final ManagedIOSession session) {
this.managedSession = session;
IOSession iosession = session.getSession();
- iosession.setAttribute(InternalRequestExecutionHandler.HTTP_EXCHANGE, this);
+ iosession.setAttribute(HTTP_EXCHANGE, this);
iosession.setEvent(SelectionKey.OP_WRITE);
}
- private synchronized void requestFailed(final Exception ex) {
- this.responseFuture.failed(ex);
+ private synchronized void sessionRequestFailed(final Exception ex) {
+ try {
+ this.handler.failed(ex);
+ } finally {
+ this.resultFuture.failed(ex);
+ }
}
- private synchronized void requestCancelled() {
- this.responseFuture.cancel(true);
+ private synchronized void sessionRequestCancelled() {
+ try {
+ this.handler.cancelled();
+ } finally {
+ this.resultFuture.cancel(true);
+ }
}
class InternalFutureCallback implements FutureCallback<ManagedIOSession> {
public void completed(final ManagedIOSession session) {
- requestCompleted(session);
+ sessionRequestCompleted(session);
}
public void failed(final Exception ex) {
- requestFailed(ex);
+ sessionRequestFailed(ex);
}
public void cancelled() {
- requestCancelled();
+ sessionRequestCancelled();
}
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java Thu Nov 25 14:18:42 2010
@@ -32,7 +32,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.ConnectionReuseStrategy;
-import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpInetConnection;
@@ -44,14 +43,7 @@ import org.apache.http.nio.ContentEncode
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.NHttpConnection;
-import org.apache.http.nio.entity.ConsumingNHttpEntity;
-import org.apache.http.nio.entity.ConsumingNHttpEntityTemplate;
-import org.apache.http.nio.entity.NHttpEntityWrapper;
-import org.apache.http.nio.entity.ProducingNHttpEntity;
-import org.apache.http.nio.entity.SkipContentListener;
-import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
-import org.apache.http.nio.util.ByteBufferAllocator;
-import org.apache.http.nio.util.HeapByteBufferAllocator;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.DefaultedHttpParams;
import org.apache.http.params.HttpParams;
@@ -75,48 +67,28 @@ class NHttpClientProtocolHandler impleme
private static final String CONN_STATE = "http.nio.conn-state";
private final HttpProcessor httpProcessor;
- private final NHttpRequestExecutionHandler execHandler;
private final ConnectionReuseStrategy connStrategy;
- private final ByteBufferAllocator allocator;
private final HttpParams params;
public NHttpClientProtocolHandler(
final HttpProcessor httpProcessor,
- final NHttpRequestExecutionHandler execHandler,
final ConnectionReuseStrategy connStrategy,
- final ByteBufferAllocator allocator,
final HttpParams params) {
if (httpProcessor == null) {
throw new IllegalArgumentException("HTTP processor may not be null.");
}
- if (execHandler == null) {
- throw new IllegalArgumentException("HTTP request execution handler may not be null.");
- }
if (connStrategy == null) {
throw new IllegalArgumentException("Connection reuse strategy may not be null");
}
- if (allocator == null) {
- throw new IllegalArgumentException("ByteBuffer allocator may not be null");
- }
if (params == null) {
throw new IllegalArgumentException("HTTP parameters may not be null");
}
this.httpProcessor = httpProcessor;
- this.execHandler = execHandler;
this.connStrategy = connStrategy;
- this.allocator = allocator;
this.params = params;
this.log = LogFactory.getLog(getClass());
}
- public NHttpClientProtocolHandler(
- final HttpProcessor httpProcessor,
- final NHttpRequestExecutionHandler execHandler,
- final ConnectionReuseStrategy connStrategy,
- final HttpParams params) {
- this(httpProcessor, execHandler, connStrategy, new HeapByteBufferAllocator(), params);
- }
-
private void closeConnection(final NHttpClientConnection conn) {
try {
conn.close();
@@ -145,148 +117,163 @@ class NHttpClientProtocolHandler impleme
}
context.setAttribute(CONN_STATE, connState);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
- this.execHandler.initalizeContext(context, attachment);
requestReady(conn);
}
public void closed(final NHttpClientConnection conn) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Disconnected " + formatState(conn, connState));
}
- try {
+ if (connState != null) {
connState.reset();
- this.execHandler.finalizeContext(context);
- } catch (IOException ex) {
- this.log.debug("I/O error resetting connection state: " + ex.getMessage(), ex);
+ HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+ if (httpexchange != null) {
+ httpexchange.shutdown();
+ }
}
}
public void exception(final NHttpClientConnection conn, final HttpException ex) {
+ HttpContext context = conn.getContext();
+ ConnState connState = getConnState(context);
this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+ if (connState != null) {
+ HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+ if (httpexchange != null) {
+ httpexchange.failed(ex);
+ }
+ }
closeConnection(conn);
}
public void exception(final NHttpClientConnection conn, final IOException ex) {
+ HttpContext context = conn.getContext();
+ ConnState connState = getConnState(context);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ if (connState != null) {
+ HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+ if (httpexchange != null) {
+ httpexchange.failed(ex);
+ }
+ }
shutdownConnection(conn);
}
public void requestReady(final NHttpClientConnection conn) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
+ HttpExchangeImpl<?> httpexchange = getHttpExchange(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Request ready " + formatState(conn, connState));
}
if (connState.getRequestState() != MessageState.READY) {
return;
}
- try {
- HttpRequest request = this.execHandler.submitRequest(context);
- if (request == null) {
- if (this.log.isDebugEnabled()) {
- this.log.debug("No request submitted " + formatState(conn, connState));
- }
- return;
+ if (httpexchange == null || httpexchange.getResultFuture().isDone()) {
+ if (this.log.isDebugEnabled()) {
+ this.log.debug("No request submitted " + formatState(conn, connState));
}
-
+ return;
+ }
+ connState.setHttpExchange(httpexchange);
+ HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
+ try {
+ HttpRequest request = handler.getRequest();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
+ connState.setRequest(request);
+
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
this.httpProcessor.process(request, context);
HttpEntityEnclosingRequest entityReq = null;
- HttpEntity entity = null;
-
if (request instanceof HttpEntityEnclosingRequest) {
entityReq = (HttpEntityEnclosingRequest) request;
- entity = entityReq.getEntity();
}
- if (entity instanceof ProducingNHttpEntity) {
- connState.setProducingEntity((ProducingNHttpEntity) entity);
- } else if (entity != null) {
- connState.setProducingEntity(new NHttpEntityWrapper(entity));
- }
-
- connState.setRequest(request);
conn.submitRequest(request);
- if (entityReq != null && entityReq.expectContinue()) {
- int timeout = conn.getSocketTimeout();
- connState.setTimeout(timeout);
- timeout = this.params.getIntParameter(
- CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
- conn.setSocketTimeout(timeout);
- connState.setRequestState(MessageState.ACK);
- } else if (connState.getProducingEntity() != null) {
- connState.setRequestState(MessageState.BODY_STREAM);
+ if (entityReq != null) {
+ if (entityReq.expectContinue()) {
+ int timeout = conn.getSocketTimeout();
+ connState.setTimeout(timeout);
+ timeout = this.params.getIntParameter(
+ CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
+ conn.setSocketTimeout(timeout);
+ connState.setRequestState(MessageState.ACK);
+ } else {
+ connState.setRequestState(MessageState.BODY_STREAM);
+ }
} else {
connState.setRequestState(MessageState.COMPLETED);
}
-
} catch (IOException ex) {
- shutdownConnection(conn);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ shutdownConnection(conn);
+ handler.failed(ex);
} catch (HttpException ex) {
- closeConnection(conn);
this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+ closeConnection(conn);
+ handler.failed(ex);
}
}
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Input ready " + formatState(conn, connState));
}
- ConsumingNHttpEntity consumingEntity = connState.getConsumingEntity();
+ HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
try {
- consumingEntity.consumeContent(decoder, conn);
+ handler.consumeContent(decoder, conn);
if (decoder.isCompleted()) {
processResponse(conn, connState);
}
} catch (IOException ex) {
- shutdownConnection(conn);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ shutdownConnection(conn);
+ handler.failed(ex);
}
}
public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Output ready " + formatState(conn, connState));
}
+ HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
try {
if (connState.getRequestState() == MessageState.ACK) {
conn.suspendOutput();
return;
}
- ProducingNHttpEntity entity = connState.getProducingEntity();
- entity.produceContent(encoder, conn);
+ handler.produceContent(encoder, conn);
if (encoder.isCompleted()) {
connState.setRequestState(MessageState.COMPLETED);
}
} catch (IOException ex) {
- shutdownConnection(conn);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ shutdownConnection(conn);
+ handler.failed(ex);
}
}
public void responseReceived(final NHttpClientConnection conn) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Response received " + formatState(conn, connState));
}
-
- HttpResponse response = conn.getHttpResponse();
- response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
-
- HttpRequest request = connState.getRequest();
+ HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
try {
+ HttpResponse response = conn.getHttpResponse();
+ response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
+
+ HttpRequest request = connState.getRequest();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < HttpStatus.SC_OK) {
@@ -314,36 +301,30 @@ class NHttpClientProtocolHandler impleme
conn.resetInput();
response.setEntity(null);
this.httpProcessor.process(response, context);
+ handler.responseReceived(response);
processResponse(conn, connState);
} else {
- HttpEntity entity = response.getEntity();
- if (entity != null) {
- ConsumingNHttpEntity consumingEntity = this.execHandler.responseEntity(
- response, context);
- if (consumingEntity == null) {
- consumingEntity = new ConsumingNHttpEntityTemplate(
- entity, new SkipContentListener(this.allocator));
- }
- response.setEntity(consumingEntity);
- connState.setConsumingEntity(consumingEntity);
- this.httpProcessor.process(response, context);
- }
+ this.httpProcessor.process(response, context);
+ handler.responseReceived(response);
}
} catch (IOException ex) {
- shutdownConnection(conn);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ shutdownConnection(conn);
+ handler.failed(ex);
} catch (HttpException ex) {
- closeConnection(conn);
this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+ closeConnection(conn);
+ handler.failed(ex);
}
}
public void timeout(final NHttpClientConnection conn) {
HttpContext context = conn.getContext();
- ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+ ConnState connState = getConnState(context);
if (this.log.isDebugEnabled()) {
this.log.debug("Timeout " + formatState(conn, connState));
}
+ HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
try {
if (connState.getRequestState() == MessageState.ACK) {
continueRequest(conn, connState);
@@ -361,11 +342,20 @@ class NHttpClientProtocolHandler impleme
}
}
} catch (IOException ex) {
- shutdownConnection(conn);
this.log.error("I/O error: " + ex.getMessage(), ex);
+ shutdownConnection(conn);
+ handler.failed(ex);
}
}
+ private ConnState getConnState(final HttpContext context) {
+ return (ConnState) context.getAttribute(CONN_STATE);
+ }
+
+ private HttpExchangeImpl<?> getHttpExchange(final HttpContext context) {
+ return (HttpExchangeImpl<?>) context.getAttribute(HttpExchangeImpl.HTTP_EXCHANGE);
+ }
+
private void continueRequest(
final NHttpClientConnection conn,
final ConnState connState) {
@@ -386,22 +376,22 @@ class NHttpClientProtocolHandler impleme
private void processResponse(
final NHttpClientConnection conn,
final ConnState connState) throws IOException {
+ HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
if (!connState.isValid()) {
conn.close();
}
HttpContext context = conn.getContext();
HttpResponse response = connState.getResponse();
- this.execHandler.handleResponse(response, context);
if (!this.connStrategy.keepAlive(response, context)) {
conn.close();
}
if (this.log.isDebugEnabled()) {
this.log.debug("Response processed " + formatState(conn, connState));
}
+ httpexchange.completed();
if (conn.isOpen()) {
// Ready for another request
- connState.resetInput();
- connState.resetOutput();
+ connState.reset();
conn.requestOutput();
}
}
Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java Thu Nov 25 14:18:42 2010
@@ -26,8 +26,12 @@
*/
package org.apache.http.nio.client;
+import java.util.concurrent.Future;
+
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.concurrent.FutureCallback;
import org.apache.http.nio.reactor.IOReactorStatus;
public interface AsyncHttpClient {
@@ -38,6 +42,10 @@ public interface AsyncHttpClient {
IOReactorStatus getStatus();
- HttpExchange execute(HttpHost target, HttpRequest request);
+ <T> Future<T> execute(
+ HttpAsyncExchangeHandler<T> handler, FutureCallback<T> callback);
+
+ Future<HttpResponse> execute(
+ HttpHost target, HttpRequest request, FutureCallback<HttpResponse> callback);
}
Added: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java?rev=1039053&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java (added)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java Thu Nov 25 14:18:42 2010
@@ -0,0 +1,56 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.nio.client;
+
+import java.io.IOException;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+
+public interface HttpAsyncExchangeHandler<T> {
+
+ HttpHost getTarget();
+
+ HttpRequest getRequest();
+
+ void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException;
+
+ void responseReceived(HttpResponse response);
+
+ void consumeContent(ContentDecoder decoder, IOControl ioctrl) throws IOException;
+
+ void failed(Exception ex);
+
+ void cancelled();
+
+ T completed();
+
+}
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java Thu Nov 25 14:18:42 2010
@@ -3,6 +3,7 @@ package org.apache.http.impl.nio.client;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
+import java.util.concurrent.Future;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
@@ -13,7 +14,6 @@ import org.apache.http.impl.nio.conn.Bas
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.localserver.ServerTestBase;
import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.params.BasicHttpParams;
@@ -50,8 +50,8 @@ public class TestHttpAsync extends Serve
public void testSingleGet() throws Exception {
this.httpclient.start();
HttpGet httpget = new HttpGet("/random/2048");
- HttpExchange httpexg = this.httpclient.execute(this.target, httpget);
- HttpResponse response = httpexg.awaitResponse();
+ Future<HttpResponse> future = this.httpclient.execute(this.target, httpget, null);
+ HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
@@ -67,8 +67,8 @@ public class TestHttpAsync extends Serve
HttpPost httppost = new HttpPost("/echo/stuff");
httppost.setEntity(new NByteArrayEntity(b1));
- HttpExchange httpexg = this.httpclient.execute(this.target, httppost);
- HttpResponse response = httpexg.awaitResponse();
+ Future<HttpResponse> future = this.httpclient.execute(this.target, httppost, null);
+ HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
HttpEntity entity = response.getEntity();
@@ -89,17 +89,17 @@ public class TestHttpAsync extends Serve
this.sessionManager.setTotalMax(100);
this.httpclient.start();
- Queue<HttpExchange> queue = new LinkedList<HttpExchange>();
+ Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
for (int i = 0; i < reqCount; i++) {
HttpPost httppost = new HttpPost("/echo/stuff");
httppost.setEntity(new NByteArrayEntity(b1));
- queue.add(this.httpclient.execute(this.target, httppost));
+ queue.add(this.httpclient.execute(this.target, httppost, null));
}
while (!queue.isEmpty()) {
- HttpExchange httpexg = queue.remove();
- HttpResponse response = httpexg.awaitResponse();
+ Future<HttpResponse> future = queue.remove();
+ HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
HttpEntity entity = response.getEntity();
@@ -121,17 +121,17 @@ public class TestHttpAsync extends Serve
this.sessionManager.setTotalMax(100);
this.httpclient.start();
- Queue<HttpExchange> queue = new LinkedList<HttpExchange>();
+ Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
for (int i = 0; i < reqCount; i++) {
HttpPost httppost = new HttpPost("/echo/stuff");
httppost.setEntity(new NByteArrayEntity(b1));
- queue.add(this.httpclient.execute(this.target, httppost));
+ queue.add(this.httpclient.execute(this.target, httppost, null));
}
while (!queue.isEmpty()) {
- HttpExchange httpexg = queue.remove();
- HttpResponse response = httpexg.awaitResponse();
+ Future<HttpResponse> future = queue.remove();
+ HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
HttpEntity entity = response.getEntity();