You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/09/11 15:18:54 UTC

svn commit: r1169449 - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/impl/nio/ main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/nio/protocol/

Author: olegk
Date: Sun Sep 11 13:18:53 2011
New Revision: 1169449

URL: http://svn.apache.org/viewvc?rev=1169449&view=rev
Log:
Implemented support for async expectation verification in HttpAsyncServiceHandler

Added:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java   (with props)
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java   (with props)
Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpClientConnection.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpServerConnection.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestBufferingNHttpHandlers.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncHandlers.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpClientConnection.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpClientConnection.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpClientConnection.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpClientConnection.java Sun Sep 11 13:18:53 2011
@@ -28,6 +28,7 @@
 package org.apache.http.impl.nio;
 
 import java.io.IOException;
+import java.nio.channels.SelectionKey;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpEntityEnclosingRequest;
@@ -170,7 +171,7 @@ public class DefaultNHttpClientConnectio
                     close();
                 }
             }
-            if (this.contentDecoder != null) {
+            if (this.contentDecoder != null && (this.session.getEventMask() & SelectionKey.OP_READ) > 0) {
                 handler.inputReady(this, this.contentDecoder);
                 if (this.contentDecoder.isCompleted()) {
                     // Response entity received

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpServerConnection.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpServerConnection.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpServerConnection.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/impl/nio/DefaultNHttpServerConnection.java Sun Sep 11 13:18:53 2011
@@ -28,6 +28,7 @@
 package org.apache.http.impl.nio;
 
 import java.io.IOException;
+import java.nio.channels.SelectionKey;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpEntityEnclosingRequest;
@@ -171,7 +172,7 @@ public class DefaultNHttpServerConnectio
                     close();
                 }
             }
-            if (this.contentDecoder != null) {
+            if (this.contentDecoder != null && (this.session.getEventMask() & SelectionKey.OP_READ) > 0) {
                 handler.inputReady(this, this.contentDecoder);
                 if (this.contentDecoder.isCompleted()) {
                     // Request entity received

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Sun Sep 11 13:18:53 2011
@@ -128,7 +128,7 @@ public class HttpAsyncClientProtocolHand
                     timeout = request.getParams().getIntParameter(
                             CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
                     conn.setSocketTimeout(timeout);
-                    httpexchange.setRequestState(MessageState.ACK);
+                    httpexchange.setRequestState(MessageState.ACK_EXPECTED);
                 } else {
                     httpexchange.setRequestState(MessageState.BODY_STREAM);
                 }
@@ -170,7 +170,7 @@ public class HttpAsyncClientProtocolHand
         HttpExchange httpexchange = getHttpExchange(conn);
         HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
         try {
-            if (httpexchange.getRequestState() == MessageState.ACK) {
+            if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
                 conn.suspendOutput();
                 return;
             }
@@ -205,7 +205,7 @@ public class HttpAsyncClientProtocolHand
                     throw new ProtocolException(
                             "Unexpected response: " + response.getStatusLine());
                 }
-                if (httpexchange.getRequestState() == MessageState.ACK) {
+                if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
                     int timeout = httpexchange.getTimeout();
                     conn.setSocketTimeout(timeout);
                     conn.requestOutput();
@@ -214,7 +214,7 @@ public class HttpAsyncClientProtocolHand
                 return;
             } else {
                 httpexchange.setResponse(response);
-                if (httpexchange.getRequestState() == MessageState.ACK) {
+                if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
                     int timeout = httpexchange.getTimeout();
                     conn.setSocketTimeout(timeout);
                     conn.resetOutput();
@@ -252,7 +252,7 @@ public class HttpAsyncClientProtocolHand
             return;
         }
         try {
-            if (httpexchange.getRequestState() == MessageState.ACK) {
+            if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
                 int timeout = httpexchange.getTimeout();
                 conn.setSocketTimeout(timeout);
                 conn.requestOutput();

Added: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java?rev=1169449&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java (added)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java Sun Sep 11 13:18:53 2011
@@ -0,0 +1,41 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.protocol;
+
+/**
+ * @since 4.2
+ */
+public interface HttpAsyncContinueTrigger {
+
+    void continueRequest();
+
+    void submitResponse(HttpAsyncResponseProducer responseProducer);
+
+    boolean isTriggered();
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncContinueTrigger.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java?rev=1169449&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java (added)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java Sun Sep 11 13:18:53 2011
@@ -0,0 +1,46 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.http.nio.protocol;
+
+import java.io.IOException;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.protocol.HttpContext;
+
+/**
+ * @since 4.2
+ */
+public interface HttpAsyncExpectationVerifier {
+
+    void verify(
+            HttpRequest request,
+            HttpAsyncContinueTrigger trigger,
+            HttpContext context) throws HttpException, IOException;
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncExpectationVerifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Sun Sep 11 13:18:53 2011
@@ -66,12 +66,14 @@ public class HttpAsyncServiceHandler imp
     private static final String HTTP_EXCHANGE = "http.nio.http-exchange";
 
     private final HttpAsyncRequestHandlerResolver handlerResolver;
+    private final HttpAsyncExpectationVerifier expectationVerifier;
     private final HttpProcessor httpProcessor;
     private final ConnectionReuseStrategy connStrategy;
     private final HttpParams params;
 
     public HttpAsyncServiceHandler(
             final HttpAsyncRequestHandlerResolver handlerResolver,
+            final HttpAsyncExpectationVerifier expectationVerifier,
             final HttpProcessor httpProcessor,
             final ConnectionReuseStrategy connStrategy,
             final HttpParams params) {
@@ -89,11 +91,20 @@ public class HttpAsyncServiceHandler imp
             throw new IllegalArgumentException("HTTP parameters may not be null");
         }
         this.handlerResolver = handlerResolver;
+        this.expectationVerifier = expectationVerifier;
         this.httpProcessor = httpProcessor;
         this.connStrategy = connStrategy;
         this.params = params;
     }
 
+    public HttpAsyncServiceHandler(
+            final HttpAsyncRequestHandlerResolver handlerResolver,
+            final HttpProcessor httpProcessor,
+            final ConnectionReuseStrategy connStrategy,
+            final HttpParams params) {
+        this(handlerResolver, null, httpProcessor, connStrategy, params);
+    }
+
     public void connected(final NHttpServerConnection conn) {
         HttpExchange httpExchange = new HttpExchange();
         conn.getContext().setAttribute(HTTP_EXCHANGE, httpExchange);
@@ -127,18 +138,19 @@ public class HttpAsyncServiceHandler imp
             if (request instanceof HttpEntityEnclosingRequest) {
                 HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest) request;
                 if (entityRequest.expectContinue()) {
-                    ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
-                    if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
-                        // Downgrade protocol version if greater than HTTP/1.1
-                        ver = HttpVersion.HTTP_1_1;
+                    httpExchange.setRequestState(MessageState.ACK_EXPECTED);
+                    if (this.expectationVerifier != null) {
+                        conn.suspendInput();
+                        HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(httpExchange, conn);
+                        this.expectationVerifier.verify(request, trigger, context);
+                    } else {
+                        HttpResponse response = create100Continue(request);
+                        conn.submitResponse(response);
+                        httpExchange.setRequestState(MessageState.BODY_STREAM);
                     }
-                    HttpResponse response = new BasicHttpResponse(
-                            ver, HttpStatus.SC_CONTINUE, "Continue");
-                    response.setParams(
-                            new DefaultedHttpParams(response.getParams(), this.params));
-                    conn.submitResponse(response);
+                } else {
+                    httpExchange.setRequestState(MessageState.BODY_STREAM);
                 }
-                httpExchange.setRequestState(MessageState.BODY_STREAM);
             } else {
                 // No request content is expected.
                 // Process request right away
@@ -220,11 +232,21 @@ public class HttpAsyncServiceHandler imp
 
     public void responseReady(final NHttpServerConnection conn) {
         HttpExchange httpExchange = (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE);
-        if (!httpExchange.isResponseReady()) {
-            return;
-        }
         try {
-            commitResponse(conn, httpExchange);
+            if (httpExchange.getRequestState() == MessageState.ACK) {
+                conn.requestInput();
+                httpExchange.setRequestState(MessageState.COMPLETED);
+                HttpRequest request = httpExchange.getRequest();
+                HttpResponse response = create100Continue(request);
+                conn.submitResponse(response);
+            } else if (httpExchange.getResponse() == null && httpExchange.getResponseProducer() != null) {
+                if (httpExchange.getRequestState() == MessageState.ACK_EXPECTED) {
+                    conn.resetInput();
+                    httpExchange.setRequestState(MessageState.COMPLETED);
+                }
+                conn.resetInput();
+                commitResponse(conn, httpExchange);
+            }
         } catch (RuntimeException ex) {
             shutdownConnection(conn);
             throw ex;
@@ -301,6 +323,15 @@ public class HttpAsyncServiceHandler imp
                 HttpVersion.HTTP_1_0, code, NStringEntity.create(message), false);
     }
 
+    private HttpResponse create100Continue(final HttpRequest request) {
+        ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
+        if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
+            // Downgrade protocol version if greater than HTTP/1.1
+            ver = HttpVersion.HTTP_1_1;
+        }
+        return new BasicHttpResponse(ver, HttpStatus.SC_CONTINUE, "Continue");
+    }
+
     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
             return false;
@@ -346,8 +377,6 @@ public class HttpAsyncServiceHandler imp
         context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
         this.httpProcessor.process(response, context);
 
-        httpExchange.setHandled(true);
-
         HttpEntity entity = response.getEntity();
         if (entity != null && !canResponseHaveBody(request, response)) {
             response.setEntity(null);
@@ -392,7 +421,6 @@ public class HttpAsyncServiceHandler imp
         private volatile HttpAsyncResponseProducer responseProducer;
         private volatile HttpRequest request;
         private volatile HttpResponse response;
-        private volatile boolean handled;
 
         HttpExchange() {
             super();
@@ -475,14 +503,6 @@ public class HttpAsyncServiceHandler imp
             this.response = response;
         }
 
-        public boolean isResponseReady() {
-            return !this.handled && this.responseProducer != null;
-        }
-
-        public void setHandled(boolean handled) {
-            this.handled = handled;
-        }
-
         public void clear() {
             this.responseState = MessageState.READY;
             this.requestState = MessageState.READY;
@@ -505,7 +525,6 @@ public class HttpAsyncServiceHandler imp
             this.responseProducer = null;
             this.request = null;
             this.response = null;
-            this.handled = false;
             this.context.clear();
         }
 
@@ -524,8 +543,6 @@ public class HttpAsyncServiceHandler imp
             if (this.response != null) {
                 buf.append(this.response.getStatusLine());
             }
-            buf.append("; done: ");
-            buf.append(this.handled);
             buf.append(";");
             return buf.toString();
         }
@@ -545,7 +562,47 @@ public class HttpAsyncServiceHandler imp
             this.iocontrol = iocontrol;
         }
 
-        public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
+        public synchronized void submitResponse(final HttpAsyncResponseProducer responseProducer) {
+            if (responseProducer == null) {
+                throw new IllegalArgumentException("Response producer may not be null");
+            }
+            if (this.triggered) {
+                throw new IllegalStateException("Response already triggered");
+            }
+            this.triggered = true;
+            this.httpExchange.setResponseProducer(responseProducer);
+            this.iocontrol.requestOutput();
+        }
+
+        public boolean isTriggered() {
+            return this.triggered;
+        }
+
+    }
+
+    class ContinueTriggerImpl implements HttpAsyncContinueTrigger {
+
+        private final HttpExchange httpExchange;
+        private final IOControl iocontrol;
+
+        private volatile boolean triggered;
+
+        public ContinueTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) {
+            super();
+            this.httpExchange = httpExchange;
+            this.iocontrol = iocontrol;
+        }
+
+        public synchronized void continueRequest() {
+            if (this.triggered) {
+                throw new IllegalStateException("Response already triggered");
+            }
+            this.triggered = true;
+            this.httpExchange.setRequestState(MessageState.ACK);
+            this.iocontrol.requestOutput();
+        }
+
+        public synchronized void submitResponse(final HttpAsyncResponseProducer responseProducer) {
             if (responseProducer == null) {
                 throw new IllegalArgumentException("Response producer may not be null");
             }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java Sun Sep 11 13:18:53 2011
@@ -28,6 +28,6 @@ package org.apache.http.nio.protocol;
 
 enum MessageState {
 
-    READY, HEAD, ACK, BODY_STREAM, COMPLETED
+    READY, ACK_EXPECTED, ACK, BODY_STREAM, COMPLETED
 
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncHandlers.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncHandlers.java?rev=1169449&r1=1169448&r2=1169449&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncHandlers.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncHandlers.java Sun Sep 11 13:18:53 2011
@@ -33,15 +33,20 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.Future;
 
 import org.apache.http.HttpCoreNIOTestBase;
+import org.apache.http.HttpException;
 import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
 import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
 import org.apache.http.HttpVersion;
 import org.apache.http.LoggingClientConnectionFactory;
 import org.apache.http.LoggingServerConnectionFactory;
+import org.apache.http.ProtocolVersion;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
 import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.message.BasicHttpResponse;
 import org.apache.http.nio.NHttpClientIOTarget;
 import org.apache.http.nio.NHttpConnectionFactory;
 import org.apache.http.nio.NHttpServerIOTarget;
@@ -50,6 +55,7 @@ import org.apache.http.nio.reactor.IORea
 import org.apache.http.nio.reactor.ListenerEndpoint;
 import org.apache.http.params.CoreProtocolPNames;
 import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
 import org.apache.http.util.EntityUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -87,11 +93,12 @@ public class TestHttpAsyncHandlers exten
         return new LoggingClientConnectionFactory(params);
     }
 
-    private InetSocketAddress start() throws Exception {
-        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
-        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+    private InetSocketAddress start(
+            final HttpAsyncRequestHandlerResolver requestHandlerResolver,
+            final HttpAsyncExpectationVerifier expectationVerifier) throws Exception {
         HttpAsyncServiceHandler serviceHandler = new HttpAsyncServiceHandler(
-                registry,
+                requestHandlerResolver,
+                expectationVerifier,
                 this.serverHttpProc,
                 new DefaultConnectionReuseStrategy(),
                 this.serverParams);
@@ -106,9 +113,23 @@ public class TestHttpAsyncHandlers exten
         return (InetSocketAddress) endpoint.getAddress();
     }
 
+    private static String createRequestUri(final String pattern, int count) {
+        return pattern + "x" + count;
+    }
+
+    private static String createExpectedString(final String pattern, int count) {
+        StringBuilder buffer = new StringBuilder();
+        for (int i = 0; i < count; i++) {
+            buffer.append(pattern);
+        }
+        return buffer.toString();
+    }
+
     @Test
     public void testHttpGets() throws Exception {
-        InetSocketAddress address = start();
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, null);
 
         this.connpool.setDefaultMaxPerRoute(3);
         this.connpool.setMaxTotal(3);
@@ -116,18 +137,12 @@ public class TestHttpAsyncHandlers exten
         String pattern = RndTestPatternGenerator.generateText();
         int count = RndTestPatternGenerator.generateCount(1000);
 
-        StringBuilder buffer = new StringBuilder();
-        for (int i = 0; i < count; i++) {
-            buffer.append(pattern);
-        }
-        String expectedPattern = buffer.toString();
-
         HttpHost target = new HttpHost("localhost", address.getPort());
+        String expectedPattern = createExpectedString(pattern, count);
 
         Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
         for (int i = 0; i < 30; i++) {
-            BasicHttpRequest request = new BasicHttpRequest(
-                    "GET", pattern + "x" + count);
+            BasicHttpRequest request = new BasicHttpRequest("GET", createRequestUri(pattern, count));
             Future<HttpResponse> future = this.executor.execute(
                     new BasicAsyncRequestProducer(target, request),
                     new BasicAsyncResponseConsumer(),
@@ -147,7 +162,9 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsWithContentLength() throws Exception {
-        InetSocketAddress address = start();
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, null);
 
         this.connpool.setDefaultMaxPerRoute(3);
         this.connpool.setMaxTotal(3);
@@ -155,18 +172,13 @@ public class TestHttpAsyncHandlers exten
         String pattern = RndTestPatternGenerator.generateText();
         int count = RndTestPatternGenerator.generateCount(1000);
 
-        StringBuilder buffer = new StringBuilder();
-        for (int i = 0; i < count; i++) {
-            buffer.append(pattern);
-        }
-        String expectedPattern = buffer.toString();
-
         HttpHost target = new HttpHost("localhost", address.getPort());
+        String expectedPattern = createExpectedString(pattern, count);
 
         Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
         for (int i = 0; i < 30; i++) {
             BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest(
-                    "POST", pattern + "x" + count);
+                    "POST", createRequestUri(pattern, count));
             NStringEntity entity = NStringEntity.create(expectedPattern, ContentType.DEFAULT_TEXT);
             request.setEntity(entity);
             Future<HttpResponse> future = this.executor.execute(
@@ -188,7 +200,9 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsChunked() throws Exception {
-        InetSocketAddress address = start();
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, null);
 
         this.connpool.setDefaultMaxPerRoute(3);
         this.connpool.setMaxTotal(3);
@@ -196,18 +210,13 @@ public class TestHttpAsyncHandlers exten
         String pattern = RndTestPatternGenerator.generateText();
         int count = RndTestPatternGenerator.generateCount(1000);
 
-        StringBuilder buffer = new StringBuilder();
-        for (int i = 0; i < count; i++) {
-            buffer.append(pattern);
-        }
-        String expectedPattern = buffer.toString();
-
         HttpHost target = new HttpHost("localhost", address.getPort());
+        String expectedPattern = createExpectedString(pattern, count);
 
         Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
         for (int i = 0; i < 30; i++) {
             BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest(
-                    "POST", pattern + "x" + count);
+                    "POST", createRequestUri(pattern, count));
             NStringEntity entity = NStringEntity.create(expectedPattern, ContentType.DEFAULT_TEXT);
             entity.setChunked(true);
             request.setEntity(entity);
@@ -230,7 +239,9 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsHTTP10() throws Exception {
-        InetSocketAddress address = start();
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, null);
 
         this.connpool.setDefaultMaxPerRoute(3);
         this.connpool.setMaxTotal(3);
@@ -238,18 +249,13 @@ public class TestHttpAsyncHandlers exten
         String pattern = RndTestPatternGenerator.generateText();
         int count = RndTestPatternGenerator.generateCount(1000);
 
-        StringBuilder buffer = new StringBuilder();
-        for (int i = 0; i < count; i++) {
-            buffer.append(pattern);
-        }
-        String expectedPattern = buffer.toString();
-
         HttpHost target = new HttpHost("localhost", address.getPort());
+        String expectedPattern = createExpectedString(pattern, count);
 
         Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
         for (int i = 0; i < 30; i++) {
             BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest(
-                    "POST", pattern + "x" + count, HttpVersion.HTTP_1_0);
+                    "POST", createRequestUri(pattern, count), HttpVersion.HTTP_1_0);
             NStringEntity entity = NStringEntity.create(expectedPattern, ContentType.DEFAULT_TEXT);
             request.setEntity(entity);
             Future<HttpResponse> future = this.executor.execute(
@@ -271,7 +277,9 @@ public class TestHttpAsyncHandlers exten
 
     @Test
     public void testHttpPostsWithExpectContinue() throws Exception {
-        InetSocketAddress address = start();
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, null);
 
         this.connpool.setDefaultMaxPerRoute(3);
         this.connpool.setMaxTotal(3);
@@ -279,18 +287,13 @@ public class TestHttpAsyncHandlers exten
         String pattern = RndTestPatternGenerator.generateText();
         int count = RndTestPatternGenerator.generateCount(1000);
 
-        StringBuilder buffer = new StringBuilder();
-        for (int i = 0; i < count; i++) {
-            buffer.append(pattern);
-        }
-        String expectedPattern = buffer.toString();
-
         HttpHost target = new HttpHost("localhost", address.getPort());
+        String expectedPattern = createExpectedString(pattern, count);
 
         Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
         for (int i = 0; i < 30; i++) {
             BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest(
-                    "POST", pattern + "x" + count);
+                    "POST", createRequestUri(pattern, count));
             request.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
             NStringEntity entity = NStringEntity.create(expectedPattern, ContentType.DEFAULT_TEXT);
             request.setEntity(entity);
@@ -311,4 +314,74 @@ public class TestHttpAsyncHandlers exten
         }
     }
 
+    @Test
+    public void testHttpPostsWithExpectationVerification() throws Exception {
+        HttpAsyncExpectationVerifier expectationVerifier = new HttpAsyncExpectationVerifier() {
+
+            public void verify(
+                    final HttpRequest request,
+                    final HttpAsyncContinueTrigger trigger,
+                    final HttpContext context) throws HttpException {
+                ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
+                String s = request.getRequestLine().getUri();
+                if (!s.equals("AAAAAx10")) {
+                    if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
+                        ver = HttpVersion.HTTP_1_1;
+                    }
+                    BasicHttpResponse response = new BasicHttpResponse(ver,
+                            HttpStatus.SC_EXPECTATION_FAILED, "Expectation failed");
+                    response.setEntity(NStringEntity.create("Expectation failed"));
+                    trigger.submitResponse(new BasicAsyncResponseProducer(response));
+                } else {
+                    trigger.continueRequest();
+                }
+            }
+
+        };
+
+        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
+        registry.register("*", new BufferingAsyncRequestHandler(new SimpleRequestHandler()));
+        InetSocketAddress address = start(registry, expectationVerifier);
+
+        BasicHttpEntityEnclosingRequest request1 = new BasicHttpEntityEnclosingRequest(
+                "POST", createRequestUri("AAAAA", 10));
+        request1.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
+        request1.setEntity(NStringEntity.create(createExpectedString("AAAAA", 10)));
+        BasicHttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest(
+                "POST", createRequestUri("AAAAA", 10));
+        request2.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
+        request2.setEntity(NStringEntity.create(createExpectedString("AAAAA", 10)));
+        BasicHttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest(
+                "POST", createRequestUri("BBBBB", 10));
+        request3.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
+        request3.setEntity(NStringEntity.create(createExpectedString("BBBBB", 10)));
+
+        HttpRequest[] requests = new HttpRequest[] { request1, request2, request3 };
+
+        HttpHost target = new HttpHost("localhost", address.getPort());
+
+        Queue<Future<HttpResponse>> queue = new ConcurrentLinkedQueue<Future<HttpResponse>>();
+        for (int i = 0; i < requests.length; i++) {
+            Future<HttpResponse> future = this.executor.execute(
+                    new BasicAsyncRequestProducer(target, requests[i]),
+                    new BasicAsyncResponseConsumer(),
+                    this.connpool);
+            queue.add(future);
+        }
+
+        Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
+
+        Future<HttpResponse> future1 = queue.remove();
+        HttpResponse response1 = future1.get();
+        Assert.assertEquals(HttpStatus.SC_OK, response1.getStatusLine().getStatusCode());
+
+        Future<HttpResponse> future2 = queue.remove();
+        HttpResponse response2 = future2.get();
+        Assert.assertEquals(HttpStatus.SC_OK, response2.getStatusLine().getStatusCode());
+
+        Future<HttpResponse> future3 = queue.remove();
+        HttpResponse response3 = future3.get();
+        Assert.assertEquals(HttpStatus.SC_EXPECTATION_FAILED, response3.getStatusLine().getStatusCode());
+    }
+
 }