You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2010/11/25 15:18:43 UTC

svn commit: r1039053 - in /httpcomponents/httpasyncclient/trunk/src: examples/org/apache/http/examples/nio/client/ main/java/org/apache/http/impl/nio/client/ main/java/org/apache/http/nio/client/ test/java/org/apache/http/impl/nio/client/

Author: olegk
Date: Thu Nov 25 14:18:42 2010
New Revision: 1039053

URL: http://svn.apache.org/viewvc?rev=1039053&view=rev
Log:
HttpAsyncClient API redesign

Added:
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java   (with props)
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java   (with props)
Removed:
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/InternalRequestExecutionHandler.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpExchange.java
Modified:
    httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
    httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java
    httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java

Modified: httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java Thu Nov 25 14:18:42 2010
@@ -26,6 +26,10 @@
  */
 package org.apache.http.examples.nio.client;
 
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
 import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
 import org.apache.http.impl.nio.client.BasicAsyncHttpClient;
@@ -33,7 +37,6 @@ import org.apache.http.impl.nio.conn.Bas
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.message.BasicHttpRequest;
 import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
 import org.apache.http.params.CoreConnectionPNames;
 import org.apache.http.params.CoreProtocolPNames;
 import org.apache.http.params.HttpParams;
@@ -55,31 +58,29 @@ public class AsyncClientRequest {
         sessmrg.setDefaultMaxPerHost(3);
 
         AsyncHttpClient asynchttpclient = new BasicAsyncHttpClient(
-                ioReactor, 
-                sessmrg, 
+                ioReactor,
+                sessmrg,
                 params);
-        
+
         asynchttpclient.start();
-        
-        HttpHost target = new HttpHost("www.apache.org", 80);
-        BasicHttpRequest request = new BasicHttpRequest("GET", "/"); 
-
-        HttpExchange[] httpx = new HttpExchange[10]; 
-        for (int i = 0; i < httpx.length; i++) {
-            httpx[i] = asynchttpclient.execute(target, request);
-        }
-        
-        for (int i = 0; i < httpx.length; i++) {
-            HttpResponse response = httpx[i].awaitResponse();
-            if (response != null) {
+        try {
+            HttpHost target = new HttpHost("www.apache.org", 80);
+            Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
+            for (int i = 0; i < 10; i++) {
+                BasicHttpRequest request = new BasicHttpRequest("GET", "/");
+                queue.add(asynchttpclient.execute(target, request, null));
+            }
+            while (!queue.isEmpty()) {
+                Future<HttpResponse> future = queue.remove();
+                HttpResponse response = future.get();
                 System.out.println("Response: " + response.getStatusLine());
+
             }
-        }
 
-        System.out.println("Shutting down");
-        
-        asynchttpclient.shutdown();
-        
+            System.out.println("Shutting down");
+        } finally {
+            asynchttpclient.shutdown();
+        }
         System.out.println("Done");
     }
 

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicAsyncHttpClient.java Thu Nov 25 14:18:42 2010
@@ -27,6 +27,7 @@
 package org.apache.http.impl.nio.client;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,12 +35,14 @@ import org.apache.http.ConnectionReuseSt
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.HttpResponse;
 import org.apache.http.conn.routing.HttpRoute;
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.impl.nio.conn.BasicIOSessionManager;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
+import org.apache.http.nio.concurrent.FutureCallback;
 import org.apache.http.nio.conn.IOSessionManager;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOEventDispatch;
@@ -127,7 +130,6 @@ public class BasicAsyncHttpClient implem
     private void doExecute() {
         NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler(
                 createHttpProcessor(),
-                new InternalRequestExecutionHandler(),
                 createConnectionReuseStrategy(),
                 this.params);
         IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler, this.params);
@@ -166,9 +168,18 @@ public class BasicAsyncHttpClient implem
         }
     }
 
-    public HttpExchange execute(final HttpHost target, final HttpRequest request) {
-        HttpRoute route = new HttpRoute(target);
-        return new HttpExchangeImpl(request, route, null, this.sessmrg);
+    public <T> Future<T> execute(
+            final HttpAsyncExchangeHandler<T> handler, final FutureCallback<T> callback) {
+        HttpRoute route = new HttpRoute(handler.getTarget());
+        HttpExchangeImpl<T> httpexchange = new HttpExchangeImpl<T>(
+                route, null, this.sessmrg, handler, callback);
+        return httpexchange.getResultFuture();
+    }
+
+    public Future<HttpResponse> execute(
+            final HttpHost target, final HttpRequest request, final FutureCallback<HttpResponse> callback) {
+        BasicHttpAsyncExchangeHandler handler = new BasicHttpAsyncExchangeHandler(target, request);
+        return execute(handler, callback);
     }
 
 }

Added: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java?rev=1039053&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java (added)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java Thu Nov 25 14:18:42 2010
@@ -0,0 +1,178 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.impl.nio.client;
+
+import java.io.IOException;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
+import org.apache.http.nio.entity.BufferingNHttpEntity;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.entity.NHttpEntityWrapper;
+import org.apache.http.nio.entity.ProducingNHttpEntity;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
+
+public class BasicHttpAsyncExchangeHandler implements HttpAsyncExchangeHandler<HttpResponse> {
+
+    private final HttpHost target;
+    private final HttpRequest request;
+
+    private HttpResponse response;
+    private ProducingNHttpEntity contentProducingEntity;
+    private ConsumingNHttpEntity contentConsumingEntity;
+
+    public BasicHttpAsyncExchangeHandler(
+            final HttpHost target, final HttpRequest request) {
+        super();
+        if (target == null) {
+            throw new IllegalArgumentException("Target host may not be null");
+        }
+        if (request == null) {
+            throw new IllegalArgumentException("HTTP request may not be null");
+        }
+        this.target = target;
+        this.request = request;
+    }
+
+    public HttpRequest getRequest() {
+        return this.request;
+    }
+
+    public HttpHost getTarget() {
+        return this.target;
+    }
+
+    protected ConsumingNHttpEntity createConsumingHttpEntity(
+            final HttpResponse response) throws IOException {
+        if (response.getEntity() != null) {
+            return new BufferingNHttpEntity(response.getEntity(), new HeapByteBufferAllocator());
+        } else {
+            return null;
+        }
+    }
+
+    protected ProducingNHttpEntity createProducingHttpEntity(
+            final HttpRequest request) throws IOException {
+        HttpEntityEnclosingRequest entityReq;
+        HttpEntity entity = null;
+        if (request instanceof HttpEntityEnclosingRequest) {
+            entityReq = (HttpEntityEnclosingRequest) request;
+            entity = entityReq.getEntity();
+        }
+        if (entity != null) {
+            if (entity instanceof ProducingNHttpEntity) {
+                return (ProducingNHttpEntity) entity;
+            } else {
+                return new NHttpEntityWrapper(entity);
+            }
+        } else {
+            return null;
+        }
+    }
+
+    private ConsumingNHttpEntity getConsumingHttpEntity() throws IOException {
+        if (this.contentConsumingEntity == null) {
+            this.contentConsumingEntity = createConsumingHttpEntity(this.response);
+            if (this.contentConsumingEntity == null) {
+                throw new IllegalStateException("Content consumer is null");
+            }
+        }
+        return this.contentConsumingEntity;
+    }
+
+    private ProducingNHttpEntity getProducingHttpEntity() throws IOException {
+        if (this.contentProducingEntity == null) {
+            this.contentProducingEntity = createProducingHttpEntity(this.request);
+            if (this.contentProducingEntity == null) {
+                throw new IllegalStateException("Content producer is null");
+            }
+        }
+        return this.contentProducingEntity;
+    }
+
+    public void produceContent(
+            final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
+        ProducingNHttpEntity producer = getProducingHttpEntity();
+        producer.produceContent(encoder, ioctrl);
+    }
+
+    public void responseReceived(final HttpResponse response) {
+        if (this.response != null) {
+            throw new IllegalStateException("HTTP response already set");
+        }
+        this.response = response;
+    }
+
+    public void consumeContent(
+            final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
+        ConsumingNHttpEntity consumer = getConsumingHttpEntity();
+        consumer.consumeContent(decoder, ioctrl);
+    }
+
+    private void shutdown() {
+        if (this.contentProducingEntity != null) {
+            try {
+                this.contentProducingEntity.finish();
+            } catch (IOException ex) {
+            }
+        }
+        if (this.contentConsumingEntity != null) {
+            try {
+                this.contentConsumingEntity.finish();
+            } catch (IOException ex) {
+            }
+        }
+        this.response = null;
+    }
+
+
+    public void cancelled() {
+        shutdown();
+    }
+
+    public void failed(final Exception ex) {
+        shutdown();
+    }
+
+    public HttpResponse completed() {
+        if (this.response == null) {
+            throw new IllegalStateException("HTTP response is null");
+        }
+        HttpResponse response = this.response;
+        response.setEntity(this.contentConsumingEntity);
+        shutdown();
+        return response;
+    }
+
+}

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/ConnState.java Thu Nov 25 14:18:42 2010
@@ -26,12 +26,8 @@
  */
 package org.apache.http.impl.nio.client;
 
-import java.io.IOException;
-
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
-import org.apache.http.nio.entity.ConsumingNHttpEntity;
-import org.apache.http.nio.entity.ProducingNHttpEntity;
 
 class ConnState {
 
@@ -39,8 +35,7 @@ class ConnState {
     private MessageState responseState;
     private HttpRequest request;
     private HttpResponse response;
-    private ConsumingNHttpEntity consumingEntity;
-    private ProducingNHttpEntity producingEntity;
+    private HttpExchangeImpl<?> httpexchange;
     private boolean valid;
     private int timeout;
 
@@ -83,20 +78,12 @@ class ConnState {
         this.response = response;
     }
 
-    public void setConsumingEntity(final ConsumingNHttpEntity consumingEntity) {
-        this.consumingEntity = consumingEntity;
-    }
-
-    public void setProducingEntity(final ProducingNHttpEntity producingEntity) {
-        this.producingEntity = producingEntity;
+    public void setHttpExchange(final HttpExchangeImpl<?> httpexchange) {
+        this.httpexchange = httpexchange;
     }
 
-    public ProducingNHttpEntity getProducingEntity() {
-        return producingEntity;
-    }
-
-    public ConsumingNHttpEntity getConsumingEntity() {
-        return consumingEntity;
+    public HttpExchangeImpl<?> getHttpExchange() {
+        return this.httpexchange;
     }
 
     public int getTimeout() {
@@ -107,27 +94,20 @@ class ConnState {
         this.timeout = timeout;
     }
 
-    public void resetInput() throws IOException {
+    public void resetInput() {
         this.response = null;
-        if (this.consumingEntity != null) {
-            this.consumingEntity.finish();
-            this.consumingEntity = null;
-        }
         this.responseState = MessageState.READY;
     }
 
-    public void resetOutput() throws IOException {
+    public void resetOutput() {
         this.request = null;
-        if (this.producingEntity != null) {
-            this.producingEntity.finish();
-            this.producingEntity = null;
-        }
         this.requestState = MessageState.READY;
     }
 
-    public void reset() throws IOException {
+    public void reset() {
         resetInput();
         resetOutput();
+        this.httpexchange = null;
     }
 
     public boolean isValid() {

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/HttpExchangeImpl.java Thu Nov 25 14:18:42 2010
@@ -27,92 +27,123 @@
 package org.apache.http.impl.nio.client;
 
 import java.nio.channels.SelectionKey;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.http.HttpRequest;
-import org.apache.http.HttpResponse;
 import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.nio.client.HttpExchange;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
 import org.apache.http.nio.concurrent.BasicFuture;
 import org.apache.http.nio.concurrent.FutureCallback;
 import org.apache.http.nio.conn.IOSessionManager;
 import org.apache.http.nio.conn.ManagedIOSession;
 import org.apache.http.nio.reactor.IOSession;
 
-class HttpExchangeImpl implements HttpExchange {
+class HttpExchangeImpl<T> {
+
+    public static final String HTTP_EXCHANGE = "http.nio.http-exchange";
 
-    private final HttpRequest request;
     private final Future<ManagedIOSession> sessionFuture;
-    private final BasicFuture<HttpResponse> responseFuture;
+    private final BasicFuture<T> resultFuture;
+    private final HttpAsyncExchangeHandler<T> handler;
 
     private ManagedIOSession managedSession;
 
     public HttpExchangeImpl(
-            final HttpRequest request,
             final HttpRoute route,
             final Object state,
-            final IOSessionManager<HttpRoute> sessmrg) {
+            final IOSessionManager<HttpRoute> sessmrg,
+            final HttpAsyncExchangeHandler<T> handler,
+            final FutureCallback<T> callback) {
         super();
-        this.request = request;
-        this.responseFuture = new BasicFuture<HttpResponse>(null);
         this.sessionFuture = sessmrg.leaseSession(route, state, new InternalFutureCallback());
+        this.resultFuture = new BasicFuture<T>(callback);
+        this.handler = handler;
     }
 
-    public boolean isCompleted() {
-        return this.responseFuture.isDone();
+    public HttpAsyncExchangeHandler<T> getHandler() {
+        return this.handler;
     }
 
-    public HttpRequest getRequest() {
-        return this.request;
+    public Future<T> getResultFuture() {
+        return this.resultFuture;
     }
 
-    public HttpResponse awaitResponse() throws ExecutionException, InterruptedException {
-        return this.responseFuture.get();
+    public synchronized void completed() {
+        try {
+            if (this.managedSession != null) {
+                this.managedSession.releaseSession();
+            }
+            this.managedSession = null;
+            T result = this.handler.completed();
+            this.resultFuture.completed(result);
+        } catch (RuntimeException runex) {
+            this.resultFuture.failed(runex);
+            throw runex;
+        }
     }
 
-    public synchronized void completed(final HttpResponse response) {
-        if (this.managedSession != null) {
-            this.managedSession.releaseSession();
+    public synchronized void shutdown() {
+        try {
+            this.sessionFuture.cancel(true);
+            if (this.managedSession != null) {
+                this.managedSession.abortSession();
+            }
+            this.managedSession = null;
+            this.handler.cancelled();
+        } catch (RuntimeException runex) {
+            this.resultFuture.failed(runex);
+            throw runex;
         }
-        this.responseFuture.completed(response);
     }
 
-    public synchronized void cancel() {
-        this.sessionFuture.cancel(true);
-        if (this.managedSession != null) {
-            this.managedSession.abortSession();
+    public synchronized void failed(final Exception ex) {
+        try {
+            this.sessionFuture.cancel(true);
+            if (this.managedSession != null) {
+                this.managedSession.abortSession();
+            }
+            this.managedSession = null;
+            this.handler.failed(ex);
+        } catch (RuntimeException runex) {
+            this.resultFuture.failed(ex);
+            throw runex;
         }
-        this.responseFuture.cancel(true);
     }
 
-    private synchronized void requestCompleted(final ManagedIOSession session) {
+    private synchronized void sessionRequestCompleted(final ManagedIOSession session) {
         this.managedSession = session;
         IOSession iosession = session.getSession();
-        iosession.setAttribute(InternalRequestExecutionHandler.HTTP_EXCHANGE, this);
+        iosession.setAttribute(HTTP_EXCHANGE, this);
         iosession.setEvent(SelectionKey.OP_WRITE);
     }
 
-    private synchronized void requestFailed(final Exception ex) {
-        this.responseFuture.failed(ex);
+    private synchronized void sessionRequestFailed(final Exception ex) {
+        try {
+            this.handler.failed(ex);
+        } finally {
+            this.resultFuture.failed(ex);
+        }
     }
 
-    private synchronized void requestCancelled() {
-        this.responseFuture.cancel(true);
+    private synchronized void sessionRequestCancelled() {
+        try {
+            this.handler.cancelled();
+        } finally {
+            this.resultFuture.cancel(true);
+        }
     }
 
     class InternalFutureCallback implements FutureCallback<ManagedIOSession> {
 
         public void completed(final ManagedIOSession session) {
-            requestCompleted(session);
+            sessionRequestCompleted(session);
         }
 
         public void failed(final Exception ex) {
-            requestFailed(ex);
+            sessionRequestFailed(ex);
         }
 
         public void cancelled() {
-            requestCancelled();
+            sessionRequestCancelled();
         }
 
     }

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java Thu Nov 25 14:18:42 2010
@@ -32,7 +32,6 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.http.ConnectionReuseStrategy;
-import org.apache.http.HttpEntity;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpException;
 import org.apache.http.HttpInetConnection;
@@ -44,14 +43,7 @@ import org.apache.http.nio.ContentEncode
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.NHttpClientHandler;
 import org.apache.http.nio.NHttpConnection;
-import org.apache.http.nio.entity.ConsumingNHttpEntity;
-import org.apache.http.nio.entity.ConsumingNHttpEntityTemplate;
-import org.apache.http.nio.entity.NHttpEntityWrapper;
-import org.apache.http.nio.entity.ProducingNHttpEntity;
-import org.apache.http.nio.entity.SkipContentListener;
-import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
-import org.apache.http.nio.util.ByteBufferAllocator;
-import org.apache.http.nio.util.HeapByteBufferAllocator;
+import org.apache.http.nio.client.HttpAsyncExchangeHandler;
 import org.apache.http.params.CoreProtocolPNames;
 import org.apache.http.params.DefaultedHttpParams;
 import org.apache.http.params.HttpParams;
@@ -75,48 +67,28 @@ class NHttpClientProtocolHandler impleme
     private static final String CONN_STATE = "http.nio.conn-state";
 
     private final HttpProcessor httpProcessor;
-    private final NHttpRequestExecutionHandler execHandler;
     private final ConnectionReuseStrategy connStrategy;
-    private final ByteBufferAllocator allocator;
     private final HttpParams params;
 
     public NHttpClientProtocolHandler(
             final HttpProcessor httpProcessor,
-            final NHttpRequestExecutionHandler execHandler,
             final ConnectionReuseStrategy connStrategy,
-            final ByteBufferAllocator allocator,
             final HttpParams params) {
         if (httpProcessor == null) {
             throw new IllegalArgumentException("HTTP processor may not be null.");
         }
-        if (execHandler == null) {
-            throw new IllegalArgumentException("HTTP request execution handler may not be null.");
-        }
         if (connStrategy == null) {
             throw new IllegalArgumentException("Connection reuse strategy may not be null");
         }
-        if (allocator == null) {
-            throw new IllegalArgumentException("ByteBuffer allocator may not be null");
-        }
         if (params == null) {
             throw new IllegalArgumentException("HTTP parameters may not be null");
         }
         this.httpProcessor = httpProcessor;
-        this.execHandler = execHandler;
         this.connStrategy = connStrategy;
-        this.allocator = allocator;
         this.params = params;
         this.log = LogFactory.getLog(getClass());
     }
 
-    public NHttpClientProtocolHandler(
-            final HttpProcessor httpProcessor,
-            final NHttpRequestExecutionHandler execHandler,
-            final ConnectionReuseStrategy connStrategy,
-            final HttpParams params) {
-        this(httpProcessor, execHandler, connStrategy, new HeapByteBufferAllocator(), params);
-    }
-
     private void closeConnection(final NHttpClientConnection conn) {
         try {
             conn.close();
@@ -145,148 +117,163 @@ class NHttpClientProtocolHandler impleme
         }
         context.setAttribute(CONN_STATE, connState);
         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
-        this.execHandler.initalizeContext(context, attachment);
         requestReady(conn);
     }
 
     public void closed(final NHttpClientConnection conn) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Disconnected " + formatState(conn, connState));
         }
-        try {
+        if (connState != null) {
             connState.reset();
-            this.execHandler.finalizeContext(context);
-        } catch (IOException ex) {
-            this.log.debug("I/O error resetting connection state: " + ex.getMessage(), ex);
+            HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+            if (httpexchange != null) {
+                httpexchange.shutdown();
+            }
         }
     }
 
     public void exception(final NHttpClientConnection conn, final HttpException ex) {
+        HttpContext context = conn.getContext();
+        ConnState connState = getConnState(context);
         this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+        if (connState != null) {
+            HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+            if (httpexchange != null) {
+                httpexchange.failed(ex);
+            }
+        }
         closeConnection(conn);
     }
 
     public void exception(final NHttpClientConnection conn, final IOException ex) {
+        HttpContext context = conn.getContext();
+        ConnState connState = getConnState(context);
         this.log.error("I/O error: " + ex.getMessage(), ex);
+        if (connState != null) {
+            HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
+            if (httpexchange != null) {
+                httpexchange.failed(ex);
+            }
+        }
         shutdownConnection(conn);
     }
 
     public void requestReady(final NHttpClientConnection conn) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
+        HttpExchangeImpl<?> httpexchange = getHttpExchange(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Request ready " + formatState(conn, connState));
         }
         if (connState.getRequestState() != MessageState.READY) {
             return;
         }
-        try {
-            HttpRequest request = this.execHandler.submitRequest(context);
-            if (request == null) {
-                if (this.log.isDebugEnabled()) {
-                    this.log.debug("No request submitted " + formatState(conn, connState));
-                }
-                return;
+        if (httpexchange == null || httpexchange.getResultFuture().isDone()) {
+            if (this.log.isDebugEnabled()) {
+                this.log.debug("No request submitted " + formatState(conn, connState));
             }
-
+            return;
+        }
+        connState.setHttpExchange(httpexchange);
+        HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
+        try {
+            HttpRequest request = handler.getRequest();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
 
+            connState.setRequest(request);
+
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
             this.httpProcessor.process(request, context);
 
             HttpEntityEnclosingRequest entityReq = null;
-            HttpEntity entity = null;
-
             if (request instanceof HttpEntityEnclosingRequest) {
                 entityReq = (HttpEntityEnclosingRequest) request;
-                entity = entityReq.getEntity();
             }
 
-            if (entity instanceof ProducingNHttpEntity) {
-                connState.setProducingEntity((ProducingNHttpEntity) entity);
-            } else if (entity != null) {
-                connState.setProducingEntity(new NHttpEntityWrapper(entity));
-            }
-
-            connState.setRequest(request);
             conn.submitRequest(request);
 
-            if (entityReq != null && entityReq.expectContinue()) {
-                int timeout = conn.getSocketTimeout();
-                connState.setTimeout(timeout);
-                timeout = this.params.getIntParameter(
-                        CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
-                conn.setSocketTimeout(timeout);
-                connState.setRequestState(MessageState.ACK);
-            } else if (connState.getProducingEntity() != null) {
-                connState.setRequestState(MessageState.BODY_STREAM);
+            if (entityReq != null) {
+                if (entityReq.expectContinue()) {
+                    int timeout = conn.getSocketTimeout();
+                    connState.setTimeout(timeout);
+                    timeout = this.params.getIntParameter(
+                            CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
+                    conn.setSocketTimeout(timeout);
+                    connState.setRequestState(MessageState.ACK);
+                } else {
+                    connState.setRequestState(MessageState.BODY_STREAM);
+                }
             } else {
                 connState.setRequestState(MessageState.COMPLETED);
             }
-
         } catch (IOException ex) {
-            shutdownConnection(conn);
             this.log.error("I/O error: " + ex.getMessage(), ex);
+            shutdownConnection(conn);
+            handler.failed(ex);
         } catch (HttpException ex) {
-            closeConnection(conn);
             this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+            closeConnection(conn);
+            handler.failed(ex);
         }
     }
 
     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Input ready " + formatState(conn, connState));
         }
-        ConsumingNHttpEntity consumingEntity = connState.getConsumingEntity();
+        HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
         try {
-            consumingEntity.consumeContent(decoder, conn);
+            handler.consumeContent(decoder, conn);
             if (decoder.isCompleted()) {
                 processResponse(conn, connState);
             }
         } catch (IOException ex) {
-            shutdownConnection(conn);
             this.log.error("I/O error: " + ex.getMessage(), ex);
+            shutdownConnection(conn);
+            handler.failed(ex);
         }
     }
 
     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Output ready " + formatState(conn, connState));
         }
+        HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
         try {
             if (connState.getRequestState() == MessageState.ACK) {
                 conn.suspendOutput();
                 return;
             }
-            ProducingNHttpEntity entity = connState.getProducingEntity();
-            entity.produceContent(encoder, conn);
+            handler.produceContent(encoder, conn);
             if (encoder.isCompleted()) {
                 connState.setRequestState(MessageState.COMPLETED);
             }
         } catch (IOException ex) {
-            shutdownConnection(conn);
             this.log.error("I/O error: " + ex.getMessage(), ex);
+            shutdownConnection(conn);
+            handler.failed(ex);
         }
     }
 
     public void responseReceived(final NHttpClientConnection conn) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Response received " + formatState(conn, connState));
         }
-
-        HttpResponse response = conn.getHttpResponse();
-        response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
-
-        HttpRequest request = connState.getRequest();
+        HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
         try {
+            HttpResponse response = conn.getHttpResponse();
+            response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
+
+            HttpRequest request = connState.getRequest();
 
             int statusCode = response.getStatusLine().getStatusCode();
             if (statusCode < HttpStatus.SC_OK) {
@@ -314,36 +301,30 @@ class NHttpClientProtocolHandler impleme
                 conn.resetInput();
                 response.setEntity(null);
                 this.httpProcessor.process(response, context);
+                handler.responseReceived(response);
                 processResponse(conn, connState);
             } else {
-                HttpEntity entity = response.getEntity();
-                if (entity != null) {
-                    ConsumingNHttpEntity consumingEntity = this.execHandler.responseEntity(
-                            response, context);
-                    if (consumingEntity == null) {
-                        consumingEntity = new ConsumingNHttpEntityTemplate(
-                                entity, new SkipContentListener(this.allocator));
-                    }
-                    response.setEntity(consumingEntity);
-                    connState.setConsumingEntity(consumingEntity);
-                    this.httpProcessor.process(response, context);
-                }
+                this.httpProcessor.process(response, context);
+                handler.responseReceived(response);
             }
         } catch (IOException ex) {
-            shutdownConnection(conn);
             this.log.error("I/O error: " + ex.getMessage(), ex);
+            shutdownConnection(conn);
+            handler.failed(ex);
         } catch (HttpException ex) {
-            closeConnection(conn);
             this.log.error("HTTP protocol exception: " + ex.getMessage(), ex);
+            closeConnection(conn);
+            handler.failed(ex);
         }
     }
 
     public void timeout(final NHttpClientConnection conn) {
         HttpContext context = conn.getContext();
-        ConnState connState = (ConnState) context.getAttribute(CONN_STATE);
+        ConnState connState = getConnState(context);
         if (this.log.isDebugEnabled()) {
             this.log.debug("Timeout " + formatState(conn, connState));
         }
+        HttpAsyncExchangeHandler<?> handler = connState.getHttpExchange().getHandler();
         try {
             if (connState.getRequestState() == MessageState.ACK) {
                 continueRequest(conn, connState);
@@ -361,11 +342,20 @@ class NHttpClientProtocolHandler impleme
                 }
             }
         } catch (IOException ex) {
-            shutdownConnection(conn);
             this.log.error("I/O error: " + ex.getMessage(), ex);
+            shutdownConnection(conn);
+            handler.failed(ex);
         }
     }
 
+    private ConnState getConnState(final HttpContext context) {
+        return (ConnState) context.getAttribute(CONN_STATE);
+    }
+
+    private HttpExchangeImpl<?> getHttpExchange(final HttpContext context) {
+        return (HttpExchangeImpl<?>) context.getAttribute(HttpExchangeImpl.HTTP_EXCHANGE);
+    }
+
     private void continueRequest(
             final NHttpClientConnection conn,
             final ConnState connState) {
@@ -386,22 +376,22 @@ class NHttpClientProtocolHandler impleme
     private void processResponse(
             final NHttpClientConnection conn,
             final ConnState connState) throws IOException {
+        HttpExchangeImpl<?> httpexchange = connState.getHttpExchange();
         if (!connState.isValid()) {
             conn.close();
         }
         HttpContext context = conn.getContext();
         HttpResponse response = connState.getResponse();
-        this.execHandler.handleResponse(response, context);
         if (!this.connStrategy.keepAlive(response, context)) {
             conn.close();
         }
         if (this.log.isDebugEnabled()) {
             this.log.debug("Response processed " + formatState(conn, connState));
         }
+        httpexchange.completed();
         if (conn.isOpen()) {
             // Ready for another request
-            connState.resetInput();
-            connState.resetOutput();
+            connState.reset();
             conn.requestOutput();
         }
     }

Modified: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/AsyncHttpClient.java Thu Nov 25 14:18:42 2010
@@ -26,8 +26,12 @@
  */
 package org.apache.http.nio.client;
 
+import java.util.concurrent.Future;
+
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.concurrent.FutureCallback;
 import org.apache.http.nio.reactor.IOReactorStatus;
 
 public interface AsyncHttpClient {
@@ -38,6 +42,10 @@ public interface AsyncHttpClient {
 
     IOReactorStatus getStatus();
 
-    HttpExchange execute(HttpHost target, HttpRequest request);
+    <T> Future<T> execute(
+            HttpAsyncExchangeHandler<T> handler, FutureCallback<T> callback);
+
+    Future<HttpResponse> execute(
+            HttpHost target, HttpRequest request, FutureCallback<HttpResponse> callback);
 
 }

Added: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java?rev=1039053&view=auto
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java (added)
+++ httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java Thu Nov 25 14:18:42 2010
@@ -0,0 +1,56 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.http.nio.client;
+
+import java.io.IOException;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+
+public interface HttpAsyncExchangeHandler<T> {
+
+    HttpHost getTarget();
+
+    HttpRequest getRequest();
+
+    void produceContent(ContentEncoder encoder, IOControl ioctrl) throws IOException;
+
+    void responseReceived(HttpResponse response);
+
+    void consumeContent(ContentDecoder decoder, IOControl ioctrl) throws IOException;
+
+    void failed(Exception ex);
+
+    void cancelled();
+
+    T completed();
+
+}

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpasyncclient/trunk/src/main/java/org/apache/http/nio/client/HttpAsyncExchangeHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java?rev=1039053&r1=1039052&r2=1039053&view=diff
==============================================================================
--- httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java (original)
+++ httpcomponents/httpasyncclient/trunk/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java Thu Nov 25 14:18:42 2010
@@ -3,6 +3,7 @@ package org.apache.http.impl.nio.client;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Random;
+import java.util.concurrent.Future;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
@@ -13,7 +14,6 @@ import org.apache.http.impl.nio.conn.Bas
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
 import org.apache.http.localserver.ServerTestBase;
 import org.apache.http.nio.client.AsyncHttpClient;
-import org.apache.http.nio.client.HttpExchange;
 import org.apache.http.nio.entity.NByteArrayEntity;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.params.BasicHttpParams;
@@ -50,8 +50,8 @@ public class TestHttpAsync extends Serve
     public void testSingleGet() throws Exception {
         this.httpclient.start();
         HttpGet httpget = new HttpGet("/random/2048");
-        HttpExchange httpexg = this.httpclient.execute(this.target, httpget);
-        HttpResponse response = httpexg.awaitResponse();
+        Future<HttpResponse> future = this.httpclient.execute(this.target, httpget, null);
+        HttpResponse response = future.get();
         Assert.assertNotNull(response);
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
     }
@@ -67,8 +67,8 @@ public class TestHttpAsync extends Serve
         HttpPost httppost = new HttpPost("/echo/stuff");
         httppost.setEntity(new NByteArrayEntity(b1));
 
-        HttpExchange httpexg = this.httpclient.execute(this.target, httppost);
-        HttpResponse response = httpexg.awaitResponse();
+        Future<HttpResponse> future = this.httpclient.execute(this.target, httppost, null);
+        HttpResponse response = future.get();
         Assert.assertNotNull(response);
         Assert.assertEquals(200, response.getStatusLine().getStatusCode());
         HttpEntity entity = response.getEntity();
@@ -89,17 +89,17 @@ public class TestHttpAsync extends Serve
         this.sessionManager.setTotalMax(100);
         this.httpclient.start();
 
-        Queue<HttpExchange> queue = new LinkedList<HttpExchange>();
+        Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
 
         for (int i = 0; i < reqCount; i++) {
             HttpPost httppost = new HttpPost("/echo/stuff");
             httppost.setEntity(new NByteArrayEntity(b1));
-            queue.add(this.httpclient.execute(this.target, httppost));
+            queue.add(this.httpclient.execute(this.target, httppost, null));
         }
 
         while (!queue.isEmpty()) {
-            HttpExchange httpexg = queue.remove();
-            HttpResponse response = httpexg.awaitResponse();
+            Future<HttpResponse> future = queue.remove();
+            HttpResponse response = future.get();
             Assert.assertNotNull(response);
             Assert.assertEquals(200, response.getStatusLine().getStatusCode());
             HttpEntity entity = response.getEntity();
@@ -121,17 +121,17 @@ public class TestHttpAsync extends Serve
         this.sessionManager.setTotalMax(100);
         this.httpclient.start();
 
-        Queue<HttpExchange> queue = new LinkedList<HttpExchange>();
+        Queue<Future<HttpResponse>> queue = new LinkedList<Future<HttpResponse>>();
 
         for (int i = 0; i < reqCount; i++) {
             HttpPost httppost = new HttpPost("/echo/stuff");
             httppost.setEntity(new NByteArrayEntity(b1));
-            queue.add(this.httpclient.execute(this.target, httppost));
+            queue.add(this.httpclient.execute(this.target, httppost, null));
         }
 
         while (!queue.isEmpty()) {
-            HttpExchange httpexg = queue.remove();
-            HttpResponse response = httpexg.awaitResponse();
+            Future<HttpResponse> future = queue.remove();
+            HttpResponse response = future.get();
             Assert.assertNotNull(response);
             Assert.assertEquals(200, response.getStatusLine().getStatusCode());
             HttpEntity entity = response.getEntity();