You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2016/10/31 17:33:33 UTC

svn commit: r1767339 [4/14] - in /httpcomponents/httpcore/trunk: ./ httpcore5-ab/src/main/java/org/apache/hc/core5/http/benchmark/ httpcore5-ab/src/test/java/org/apache/hc/core5/http/benchmark/ httpcore5-h2/src/main/java/org/apache/hc/core5/http2/boots...

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestClassicTestClientTestingAdapter.java Mon Oct 31 17:33:27 2016
@@ -44,9 +44,9 @@ import org.apache.hc.core5.http.HttpEnti
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpStatus;
-import org.apache.hc.core5.http.entity.ByteArrayEntity;
-import org.apache.hc.core5.http.entity.EntityUtils;
 import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.testing.classic.ClassicTestServer;
 import org.junit.After;

Modified: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java?rev=1767339&r1=1767338&r2=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/framework/TestTestingFramework.java Mon Oct 31 17:33:27 2016
@@ -27,14 +27,6 @@
 
 package org.apache.hc.core5.testing.framework;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.ProtocolVersion;
-import org.apache.hc.core5.http.entity.ContentType;
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.BODY;
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.CONTENT_TYPE;
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.HEADERS;
@@ -46,6 +38,15 @@ import static org.apache.hc.core5.testin
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.REQUEST;
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.RESPONSE;
 import static org.apache.hc.core5.testing.framework.ClientPOJOAdapter.STATUS;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.ProtocolVersion;
+import org.apache.hc.core5.http.io.entity.ContentType;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;

Copied: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java (from r1765384, httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java)
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java?p2=httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java&p1=httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java&r1=1765384&r2=1767339&rev=1767339&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/test/java/org/apache/hc/core5/http/integration/TestDefaultListeningIOReactor.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java Mon Oct 31 17:33:27 2016
@@ -25,7 +25,7 @@
  *
  */
 
-package org.apache.hc.core5.http.integration;
+package org.apache.hc.core5.testing.nio;
 
 import java.io.IOException;
 import java.net.BindException;
@@ -34,21 +34,13 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hc.core5.http.config.ConnectionConfig;
-import org.apache.hc.core5.http.impl.nio.DefaultHttpServerIOEventHandlerFactory;
-import org.apache.hc.core5.http.impl.nio.HttpAsyncService;
-import org.apache.hc.core5.http.impl.nio.UriHttpAsyncRequestHandlerMapper;
-import org.apache.hc.core5.http.protocol.HttpProcessor;
-import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
-import org.apache.hc.core5.http.protocol.ResponseConnControl;
-import org.apache.hc.core5.http.protocol.ResponseContent;
-import org.apache.hc.core5.http.protocol.ResponseDate;
-import org.apache.hc.core5.http.protocol.ResponseServer;
 import org.apache.hc.core5.reactor.DefaultListeningIOReactor;
+import org.apache.hc.core5.reactor.IOEventHandler;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
 import org.apache.hc.core5.reactor.IOReactorConfig;
 import org.apache.hc.core5.reactor.IOReactorExceptionHandler;
 import org.apache.hc.core5.reactor.IOReactorStatus;
+import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.reactor.ListenerEndpoint;
 import org.junit.After;
 import org.junit.Assert;
@@ -64,21 +56,37 @@ public class TestDefaultListeningIOReact
 
     @Before
     public void setup() throws Exception {
-        final HttpProcessor httpproc = new DefaultHttpProcessor(new ResponseDate(),
-                new ResponseServer(),
-                new ResponseContent(),
-                new ResponseConnControl());
-        final HttpAsyncService serviceHandler = new HttpAsyncService(
-                httpproc,
-                new UriHttpAsyncRequestHandlerMapper());
-        final IOEventHandlerFactory eventHandlerFactory = new DefaultHttpServerIOEventHandlerFactory(
-                serviceHandler,
-                ConnectionConfig.DEFAULT);
-
         final IOReactorConfig reactorConfig = IOReactorConfig.custom()
                 .setIoThreadCount(1)
                 .build();
-        this.ioreactor = new DefaultListeningIOReactor(eventHandlerFactory, reactorConfig);
+        this.ioreactor = new DefaultListeningIOReactor(new IOEventHandlerFactory() {
+
+            @Override
+            public IOEventHandler createHandler(final IOSession ioSession) {
+                return new IOEventHandler() {
+
+                    @Override
+                    public void connected(final IOSession session) {
+                    }
+
+                    @Override
+                    public void inputReady(final IOSession session) {
+                    }
+
+                    @Override
+                    public void outputReady(final IOSession session) {
+                    }
+
+                    @Override
+                    public void timeout(final IOSession session) {
+                    }
+
+                    @Override
+                    public void disconnected(final IOSession session) {
+                    }
+                };
+            }
+        }, reactorConfig);
     }
 
     @After

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TestDefaultListeningIOReactor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,159 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+public class EchoHandler implements AsyncServerExchangeHandler {
+
+    private volatile ByteBuffer buffer;
+    private volatile CapacityChannel inputCapacityChannel;
+    private volatile DataStreamChannel outputDataChannel;
+    private volatile boolean endStream;
+
+    public EchoHandler(final int bufferSize) {
+        this.buffer = ByteBuffer.allocate(bufferSize);
+    }
+
+    private void ensureCapacity(final int chunk) {
+        if (buffer.remaining() < chunk) {
+            final ByteBuffer oldBuffer = buffer;
+            oldBuffer.flip();
+            buffer = ByteBuffer.allocate(oldBuffer.remaining() + (chunk > 2048 ? chunk : 2048));
+            buffer.put(oldBuffer);
+        }
+    }
+
+    @Override
+    public void setContext(final HttpContext context) {
+    }
+
+    @Override
+    public void verify(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ExpectationChannel expectationChannel) throws HttpException, IOException {
+        expectationChannel.sendContinue();
+    }
+
+    @Override
+    public void handleRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel) throws HttpException, IOException {
+        final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+        responseChannel.sendResponse(response, entityDetails);
+    }
+
+    @Override
+    public int consume(final ByteBuffer src) throws IOException {
+        if (buffer.position() == 0) {
+            if (outputDataChannel != null) {
+                outputDataChannel.write(src);
+            }
+        }
+        if (src.hasRemaining()) {
+            ensureCapacity(src.remaining());
+            buffer.put(src);
+            if (outputDataChannel != null) {
+                outputDataChannel.requestOutput();
+            }
+        }
+        return buffer.remaining();
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        if (buffer.hasRemaining()) {
+            capacityChannel.update(buffer.remaining());
+            inputCapacityChannel = null;
+        } else {
+            inputCapacityChannel = capacityChannel;
+        }
+    }
+
+    @Override
+    public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+        endStream = true;
+        if (buffer.position() == 0) {
+            if (outputDataChannel != null) {
+                outputDataChannel.endStream();
+            }
+        } else {
+            if (outputDataChannel != null) {
+                outputDataChannel.requestOutput();
+            }
+        }
+    }
+
+    @Override
+    public int available() {
+        return buffer.position();
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        outputDataChannel = channel;
+        buffer.flip();
+        if (buffer.hasRemaining()) {
+            channel.write(buffer);
+        }
+        buffer.compact();
+        if (buffer.position() == 0 && endStream) {
+            channel.endStream();
+        }
+        final CapacityChannel capacityChannel = inputCapacityChannel;
+        if (capacityChannel != null && buffer.hasRemaining()) {
+            capacityChannel.update(buffer.remaining());
+        }
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+    }
+
+    @Override
+    public void releaseResources() {
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/EchoHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java?rev=1767339&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1IntegrationTest.java Mon Oct 31 17:33:27 2016
@@ -0,0 +1,1536 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.http.ConnectionReuseStrategy;
+import org.apache.hc.core5.http.ContentLengthStrategy;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpConnection;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.MalformedChunkCodingException;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.config.ConnectionConfig;
+import org.apache.hc.core5.http.config.H1Config;
+import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
+import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.hc.core5.http.impl.HttpProcessors;
+import org.apache.hc.core5.http.impl.nio.AbstractClassicServerExchangeHandler;
+import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
+import org.apache.hc.core5.http.impl.nio.ConnectionListener;
+import org.apache.hc.core5.http.impl.nio.Http1StreamListener;
+import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
+import org.apache.hc.core5.http.impl.nio.bootstrap.ClientEndpoint;
+import org.apache.hc.core5.http.impl.nio.bootstrap.ClientEndpointImpl;
+import org.apache.hc.core5.http.impl.nio.entity.AbstractClassicEntityConsumer;
+import org.apache.hc.core5.http.impl.nio.entity.AbstractClassicEntityProducer;
+import org.apache.hc.core5.http.io.entity.ContentType;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseProducer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.BasicResponseProducer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.ContentEncoder;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.ExpectationChannel;
+import org.apache.hc.core5.http.nio.HandlerFactory;
+import org.apache.hc.core5.http.nio.NHttpMessageParser;
+import org.apache.hc.core5.http.nio.NHttpMessageWriter;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.SessionOutputBuffer;
+import org.apache.hc.core5.http.nio.Supplier;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.support.ResponseTrigger;
+import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.http.protocol.RequestConnControl;
+import org.apache.hc.core5.http.protocol.RequestContent;
+import org.apache.hc.core5.http.protocol.RequestValidateHost;
+import org.apache.hc.core5.reactor.IOEventHandler;
+import org.apache.hc.core5.reactor.IOSession;
+import org.apache.hc.core5.reactor.SessionRequest;
+import org.apache.hc.core5.util.CharArrayBuffer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class Http1IntegrationTest extends InternalServerTestBase {
+
+    private static final long TIMEOUT = 5;
+
+    private Http1TestClient client;
+
+    @Before
+    public void setup() throws Exception {
+        client = new Http1TestClient();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        if (client != null) {
+            client.shutdown(3, TimeUnit.SECONDS);
+        }
+    }
+
+    private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
+        try {
+            return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
+        } catch (URISyntaxException e) {
+            throw new IllegalStateException();
+        }
+    }
+
+    @Test
+    public void testSimpleGet() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            final String entity1 = result.getBody();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertEquals("Hi there", entity1);
+        }
+    }
+
+    @Test
+    public void testSimpleGetIdentityTransfer() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
+        final InetSocketAddress serverEndpoint = server.start(httpProcessor, H1Config.DEFAULT);
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result);
+        final HttpResponse response = result.getHead();
+        final String entity = result.getBody();
+        Assert.assertNotNull(response);
+        Assert.assertEquals(200, response.getCode());
+        Assert.assertEquals("Hi there", entity);
+    }
+
+    @Test
+    public void testSimpleGetsPipelined() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            final String entity = result.getBody();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            Assert.assertEquals("Hi there", entity);
+        }
+    }
+
+    @Test
+    public void testLargeGet() throws Exception {
+        server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new MultiLineResponseHandler("0123456789abcdef", 5000);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        final String s1 = result1.getBody();
+        Assert.assertNotNull(s1);
+        final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+        while (t1.hasMoreTokens()) {
+            Assert.assertEquals("0123456789abcdef", t1.nextToken());
+        }
+
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
+
+        final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(200, response2.getCode());
+        final String s2 = result2.getBody();
+        Assert.assertNotNull(s2);
+        final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
+        while (t2.hasMoreTokens()) {
+            Assert.assertEquals("0123456789abcdef", t2.nextToken());
+        }
+    }
+
+    @Test
+    public void testLargeGetsPipelined() throws Exception {
+        server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new MultiLineResponseHandler("0123456789abcdef", 2000);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            final String entity = result.getBody();
+            Assert.assertNotNull(entity);
+            final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+            while (t.hasMoreTokens()) {
+                Assert.assertEquals("0123456789abcdef", t.nextToken());
+            }
+        }
+    }
+
+    @Test
+    public void testBasicPost() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello"),
+                            new BasicAsyncEntityProducer("Hi there")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            final String entity1 = result.getBody();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertEquals("Hi back", entity1);
+        }
+    }
+
+    @Test
+    public void testBasicPostPipelined() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello"),
+                            new BasicAsyncEntityProducer("Hi there")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            final String entity = result.getBody();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            Assert.assertEquals("Hi back", entity);
+        }
+    }
+
+    @Test
+    public void testHttp10Post() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final HttpRequest request = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+            request.setVersion(HttpVersion.HTTP_1_0);
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer(request, new BasicAsyncEntityProducer("Hi there")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            final String entity1 = result.getBody();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertEquals("Hi back", entity1);
+        }
+    }
+
+    @Test
+    public void testNoEntityPost() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final HttpRequest request = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer(request, null),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            final String entity1 = result.getBody();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertEquals("Hi back", entity1);
+        }
+    }
+
+    @Test
+    public void testLargePost() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new EchoHandler(2048);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/echo"),
+                            new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            final String entity = result.getBody();
+            Assert.assertNotNull(entity);
+            final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+            while (t.hasMoreTokens()) {
+                Assert.assertEquals("0123456789abcdef", t.nextToken());
+            }
+        }
+    }
+
+    @Test
+    public void testPostsPipelinedLargeResponse() throws Exception {
+        server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new MultiLineResponseHandler("0123456789abcdef", 2000);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 2; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/"),
+                            new BasicAsyncEntityProducer("Hi there")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            final String entity = result.getBody();
+            Assert.assertNotNull(entity);
+            final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+            while (t.hasMoreTokens()) {
+                Assert.assertEquals("0123456789abcdef", t.nextToken());
+            }
+        }
+    }
+
+
+    @Test
+    public void testLargePostsPipelined() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new EchoHandler(2048);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/echo"),
+                            new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            final String entity = result.getBody();
+            Assert.assertNotNull(entity);
+            final StringTokenizer t = new StringTokenizer(entity, "\r\n");
+            while (t.hasMoreTokens()) {
+                Assert.assertEquals("0123456789abcdef", t.nextToken());
+            }
+        }
+    }
+
+    @Test
+    public void testSimpleHead() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        for (int i = 0; i < 5; i++) {
+            final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                    new BasicRequestProducer("HEAD", createRequestURI(serverEndpoint, "/hello")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertNull(result.getBody());
+        }
+    }
+
+    @Test
+    public void testHeadPipelined() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("HEAD", createRequestURI(serverEndpoint, "/hello")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response1 = result.getHead();
+            Assert.assertNotNull(response1);
+            Assert.assertEquals(200, response1.getCode());
+            Assert.assertNull(result.getBody());
+        }
+    }
+
+    @Test
+    public void testExpectationFailed() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
+
+                    @Override
+                    protected AsyncResponseProducer verify(
+                            final HttpRequest request,
+                            final HttpContext context) throws IOException, HttpException {
+                        final Header h = request.getFirstHeader("password");
+                        if (h != null && "secret".equals(h.getValue())) {
+                            return null;
+                        } else {
+                            return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+                        }
+                    }
+
+                    protected void handle(
+                            final Message<HttpRequest, String> request,
+                            final ResponseTrigger responseTrigger,
+                            final HttpContext context) throws IOException, HttpException {
+                        responseTrigger.submitResponse(
+                                new BasicResponseProducer(HttpStatus.SC_OK, "All is well"));
+
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final SessionRequest sessionRequest = client.requestSession(
+                new InetSocketAddress("localhost", serverEndpoint.getPort()), TIMEOUT, TimeUnit.SECONDS, null);
+        sessionRequest.waitFor();
+        final IOSession ioSession = sessionRequest.getSession();
+        final ClientEndpoint streamEndpoint = new ClientEndpointImpl(ioSession);
+
+        final IOEventHandler handler = ioSession.getHandler();
+        Assert.assertNotNull(handler);
+        Assert.assertTrue(handler instanceof HttpConnection);
+        final HttpConnection conn = (HttpConnection) handler;
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        request1.addHeader("password", "secret");
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        Assert.assertNotNull("All is well", result1.getBody());
+
+        Assert.assertTrue(conn.isOpen());
+
+        final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result2 = future2.get(50000, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
+        Assert.assertNotNull("You shall not pass", result2.getBody());
+
+        Assert.assertTrue(conn.isOpen());
+
+        final HttpRequest request3 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        request3.addHeader("password", "secret");
+        final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+                new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result3);
+        final HttpResponse response3 = result3.getHead();
+        Assert.assertNotNull(response3);
+        Assert.assertEquals(200, response3.getCode());
+        Assert.assertNotNull("All is well", result3.getBody());
+
+        Assert.assertTrue(conn.isOpen());
+
+        final HttpRequest request4 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
+                new BasicRequestProducer(request4, new BasicAsyncEntityProducer("blah")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result4 = future4.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result4);
+        final HttpResponse response4 = result4.getHead();
+        Assert.assertNotNull(response4);
+        Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
+        Assert.assertNotNull("You shall not pass", result4.getBody());
+
+        Assert.assertFalse(conn.isOpen());
+    }
+
+    @Test
+    public void testDelayedExpectationVerification() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new AsyncServerExchangeHandler() {
+
+                    private final Random random = new Random(System.currentTimeMillis());
+                    private final AsyncEntityProducer entityProducer = new BasicAsyncEntityProducer(
+                            "All is well");
+
+                    @Override
+                    public void setContext(final HttpContext context) {
+                    }
+
+                    @Override
+                    public void verify(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ExpectationChannel expectationChannel) throws HttpException, IOException {
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    Thread.sleep(random.nextInt(1000));
+                                    expectationChannel.sendContinue();
+                                } catch (Exception ignore) {
+                                }
+                            }
+                        });
+                    }
+
+                    @Override
+                    public void handleRequest(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ResponseChannel responseChannel) throws HttpException, IOException {
+                        final HttpResponse response = new BasicHttpResponse(200);
+                        responseChannel.sendResponse(response, entityProducer);
+                    }
+
+                    @Override
+                    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                        capacityChannel.update(Integer.MAX_VALUE);
+                    }
+
+                    @Override
+                    public int consume(final ByteBuffer src) throws IOException {
+                        return Integer.MAX_VALUE;
+                    }
+
+                    @Override
+                    public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+                    }
+
+                    @Override
+                    public int available() {
+                        return entityProducer.available();
+                    }
+
+                    @Override
+                    public void produce(final DataStreamChannel channel) throws IOException {
+                        entityProducer.produce(channel);
+                    }
+
+                    @Override
+                    public void failed(final Exception cause) {
+                    }
+
+                    @Override
+                    public void releaseResources() {
+                    }
+
+                };
+
+            }
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start(H1Config.custom().setWaitForContinueTimeout(100).build());
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+        for (int i = 0; i < 5; i++) {
+            queue.add(streamEndpoint.execute(
+                    new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/"),
+                            new BasicAsyncEntityProducer("Some important message")),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+        }
+        while (!queue.isEmpty()) {
+            final Future<Message<HttpResponse, String>> future = queue.remove();
+            final Message<HttpResponse, String> result = future.get(5 * TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNotNull(result);
+            final HttpResponse response = result.getHead();
+            Assert.assertNotNull(response);
+            Assert.assertEquals(200, response.getCode());
+            Assert.assertNotNull("All is well", result.getBody());
+        }
+    }
+
+    @Test
+    public void testPrematureResponse() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new AsyncServerExchangeHandler() {
+
+                    private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
+
+                    @Override
+                    public void setContext(final HttpContext context) {
+                    }
+
+                    @Override
+                    public void verify(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ExpectationChannel expectationChannel) throws HttpException, IOException {
+                        expectationChannel.sendContinue();
+                    }
+
+                    @Override
+                    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                        capacityChannel.update(Integer.MAX_VALUE);
+                    }
+
+                    @Override
+                    public int consume(final ByteBuffer src) throws IOException {
+                        return Integer.MAX_VALUE;
+                    }
+
+                    @Override
+                    public void streamEnd(final List<Header> trailers) throws HttpException, IOException {
+                    }
+
+                    @Override
+                    public void handleRequest(
+                            final HttpRequest request,
+                            final EntityDetails entityDetails,
+                            final ResponseChannel responseChannel) throws HttpException, IOException {
+                        final AsyncResponseProducer producer;
+                        final Header h = request.getFirstHeader("password");
+                        if (h != null && "secret".equals(h.getValue())) {
+                            producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
+                        } else {
+                            producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
+                        }
+                        responseProducer.set(producer);
+                        responseChannel.sendResponse(producer.produceResponse(), producer.getEntityDetails());
+                    }
+
+                    @Override
+                    public int available() {
+                        final AsyncResponseProducer producer = responseProducer.get();
+                        return producer.available();
+                    }
+
+                    @Override
+                    public void produce(final DataStreamChannel channel) throws IOException {
+                        final AsyncResponseProducer producer = responseProducer.get();
+                        producer.produce(channel);
+                    }
+
+                    @Override
+                    public void failed(final Exception cause) {
+                    }
+
+                    @Override
+                    public void releaseResources() {
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
+        Assert.assertNotNull("You shall not pass", result1.getBody());
+    }
+
+    @Test
+    public void testSlowResponseConsumer() throws Exception {
+        server.register("/", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new MultiLineResponseHandler("0123456789abcd", 100);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start(H1Config.DEFAULT, ConnectionConfig.custom().setBufferSize(256).build());
+
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final HttpRequest request1 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/"));
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, null),
+                new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
+
+                    @Override
+                    protected String consumeData(
+                            final ContentType contentType, final InputStream inputStream) throws IOException {
+                        Charset charset = contentType != null ? contentType.getCharset() : null;
+                        if (charset == null) {
+                            charset = StandardCharsets.US_ASCII;
+                        }
+
+                        final StringBuilder buffer = new StringBuilder();
+                        try {
+                            final byte[] tmp = new byte[16];
+                            int l;
+                            while ((l = inputStream.read(tmp)) != -1) {
+                                buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
+                                Thread.sleep(50);
+                            }
+                        } catch (InterruptedException ex) {
+                            Thread.currentThread().interrupt();
+                            throw new InterruptedIOException(ex.getMessage());
+                        }
+                        return buffer.toString();
+                    }
+                }),
+                null);
+
+        final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        final String s1 = result1.getBody();
+        Assert.assertNotNull(s1);
+        final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+        while (t1.hasMoreTokens()) {
+            Assert.assertEquals("0123456789abcd", t1.nextToken());
+        }
+    }
+
+    @Test
+    public void testSlowRequestProducer() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new EchoHandler(2048);
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/echo"));
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
+
+                    @Override
+                    protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
+                        Charset charset = contentType.getCharset();
+                        if (charset == null) {
+                            charset = StandardCharsets.US_ASCII;
+                        }
+                        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
+                            for (int i = 0; i < 500; i++) {
+                                if (i % 100 == 0) {
+                                    writer.flush();
+                                    Thread.sleep(500);
+                                }
+                                writer.write("0123456789abcdef\r\n");
+                            }
+                        } catch (InterruptedException ex) {
+                            Thread.currentThread().interrupt();
+                            throw new InterruptedIOException(ex.getMessage());
+                        }
+                    }
+
+                }),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        final String s1 = result1.getBody();
+        Assert.assertNotNull(s1);
+        final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+        while (t1.hasMoreTokens()) {
+            Assert.assertEquals("0123456789abcdef", t1.nextToken());
+        }
+    }
+
+    @Test
+    public void testSlowResponseProducer() throws Exception {
+        server.register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
+
+                    @Override
+                    protected void handle(
+                            final HttpRequest request,
+                            final InputStream requestStream,
+                            final HttpResponse response,
+                            final OutputStream responseStream,
+                            final HttpContext context) throws IOException, HttpException {
+
+                        if (!"/hello".equals(request.getPath())) {
+                            response.setCode(HttpStatus.SC_NOT_FOUND);
+                            return;
+                        }
+                        if (!"POST".equalsIgnoreCase(request.getMethod())) {
+                            response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
+                            return;
+                        }
+                        if (requestStream == null) {
+                            return;
+                        }
+                        final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
+                        final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
+                        Charset charset = contentType != null ? contentType.getCharset() : null;
+                        if (charset == null) {
+                            charset = StandardCharsets.US_ASCII;
+                        }
+                        response.setCode(HttpStatus.SC_OK);
+                        response.setHeader(h1);
+                        try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
+                             final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
+                            try {
+                                String l;
+                                int count = 0;
+                                while ((l = reader.readLine()) != null) {
+                                    writer.write(l);
+                                    writer.write("\r\n");
+                                    count++;
+                                    if (count % 500 == 0) {
+                                        Thread.sleep(500);
+                                    }
+                                }
+                                writer.flush();
+                            } catch (InterruptedException ex) {
+                                Thread.currentThread().interrupt();
+                                throw new InterruptedIOException(ex.getMessage());
+                            }
+                        }
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start(H1Config.DEFAULT, ConnectionConfig.custom().setBufferSize(256).build());
+
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final HttpRequest request1 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello"));
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(5 * TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        final String s1 = result1.getBody();
+        Assert.assertNotNull(s1);
+        final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
+        while (t1.hasMoreTokens()) {
+            Assert.assertEquals("0123456789abcd", t1.nextToken());
+        }
+    }
+
+    @Test
+    public void testPipelinedConnectionClose() throws Exception {
+        server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-1"),
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello-2"));
+        request2.addHeader(HttpHeaders.CONNECTION, "close");
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer(request2,
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+                new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        final String entity1 = result1.getBody();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        Assert.assertEquals("Hi back", entity1);
+
+        final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        final String entity2 = result2.getBody();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(200, response2.getCode());
+        Assert.assertEquals("Hi back", entity2);
+
+        try {
+            final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNull(result3);
+            Assert.assertTrue(future3.isCancelled());
+        } catch (ExecutionException ignore) {
+        }
+
+        final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
+                new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result4 = future4.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNull(result4);
+        Assert.assertTrue(future4.isCancelled());
+    }
+
+    @Test
+    public void testPipelinedInvalidRequest() throws Exception {
+        server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi back");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-1"),
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final HttpRequest request2 = new BasicHttpRequest("POST", createRequestURI(serverEndpoint, "/hello-2"));
+        request2.addHeader(HttpHeaders.HOST, "");
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer(request2,
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
+                new BasicRequestProducer("POST", createRequestURI(serverEndpoint, "/hello-3"),
+                        new BasicAsyncEntityProducer("Hi there")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        final String entity1 = result1.getBody();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        Assert.assertEquals("Hi back", entity1);
+
+        final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        final String entity2 = result2.getBody();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(400, response2.getCode());
+        Assert.assertTrue(entity2.length() > 0);
+
+        try {
+            final Message<HttpResponse, String> result3 = future3.get(TIMEOUT, TimeUnit.SECONDS);
+            Assert.assertNull(result3);
+            Assert.assertTrue(future3.isCancelled());
+        } catch (ExecutionException ignore) {
+        }
+    }
+
+    private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
+
+    private static class BrokenChunkEncoder extends AbstractContentEncoder {
+
+        private final CharArrayBuffer lineBuffer;
+        private boolean done;
+
+        BrokenChunkEncoder(
+                final WritableByteChannel channel,
+                final SessionOutputBuffer buffer,
+                final BasicHttpTransportMetrics metrics) {
+            super(channel, buffer, metrics);
+            lineBuffer = new CharArrayBuffer(16);
+        }
+
+        @Override
+        public void complete() throws IOException {
+            super.complete();
+        }
+
+        @Override
+        public int write(final ByteBuffer src) throws IOException {
+            final int chunk;
+            if (!done) {
+                lineBuffer.clear();
+                lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
+                buffer().writeLine(lineBuffer);
+                buffer().write(ByteBuffer.wrap(GARBAGE));
+                done = true;
+                chunk = GARBAGE.length;
+            } else {
+                chunk = 0;
+            }
+            final long bytesWritten = buffer().flush(channel());
+            if (bytesWritten > 0) {
+                metrics().incrementBytesTransferred(bytesWritten);
+            }
+            if (!buffer().hasData()) {
+                channel().close();
+            }
+            return chunk;
+        }
+
+    }
+
+    @Test
+    public void testTruncatedChunk() throws Exception {
+        final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
+                HttpProcessors.server(),
+                new HandlerFactory<AsyncServerExchangeHandler>() {
+
+                    @Override
+                    public AsyncServerExchangeHandler create(final HttpRequest request) throws HttpException {
+                        return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
+
+                            @Override
+                            protected void handle(
+                                    final Message<HttpRequest, String> request,
+                                    final ResponseTrigger responseTrigger,
+                                    final HttpContext context) throws IOException, HttpException {
+                                responseTrigger.submitResponse(
+                                        new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")));
+                            }
+
+                        };
+                    }
+
+                },
+                H1Config.DEFAULT,
+                ConnectionConfig.DEFAULT,
+                DefaultConnectionReuseStrategy.INSTANCE) {
+
+            @Override
+            protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
+                    final IOSession ioSession,
+                    final HttpProcessor httpProcessor,
+                    final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
+                    final H1Config h1Config,
+                    final ConnectionConfig connectionConfig,
+                    final ConnectionReuseStrategy connectionReuseStrategy,
+                    final NHttpMessageParser<HttpRequest> incomingMessageParser,
+                    final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
+                    final ContentLengthStrategy incomingContentStrategy,
+                    final ContentLengthStrategy outgoingContentStrategy,
+                    final ConnectionListener connectionListener,
+                    final Http1StreamListener streamListener) {
+                return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
+                        h1Config, connectionConfig, connectionReuseStrategy,
+                        incomingMessageParser, outgoingMessageWriter,
+                        incomingContentStrategy, outgoingContentStrategy,
+                        connectionListener, streamListener) {
+
+                    @Override
+                    protected ContentEncoder handleOutgoingMessage(
+                            final HttpResponse response,
+                            final WritableByteChannel channel,
+                            final SessionOutputBuffer buffer,
+                            final BasicHttpTransportMetrics metrics) throws HttpException {
+                        final long len = outgoingContentStrategy.determineLength(response);
+                        if (len == ContentLengthStrategy.CHUNKED) {
+                            return new BrokenChunkEncoder(channel, buffer, metrics);
+                        } else {
+                            return super.handleOutgoingMessage(response, channel, buffer, metrics);
+                        }
+                    }
+
+                };
+            }
+
+        });
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final AsyncRequestProducer requestProducer = new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello"));
+        final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(new StringAsyncEntityConsumer());
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
+        try {
+            future1.get();
+            Assert.fail("ExecutionException should have been thrown");
+        } catch (final ExecutionException ex) {
+            final Throwable cause = ex.getCause();
+            Assert.assertTrue(cause instanceof MalformedChunkCodingException);
+            Assert.assertTrue(responseConsumer.getException() instanceof MalformedChunkCodingException);
+            Assert.assertEquals("garbage", responseConsumer.getResponseContent());
+        }
+    }
+
+    @Test
+    public void testExceptionInHandler() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there") {
+
+                    @Override
+                    protected void handle(
+                            final Message<HttpRequest, String> request,
+                            final ResponseTrigger responseTrigger,
+                            final HttpContext context) throws IOException, HttpException {
+                        throw new HttpException("Boom");
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result);
+        final HttpResponse response1 = result.getHead();
+        final String entity1 = result.getBody();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(500, response1.getCode());
+        Assert.assertEquals("Boom", entity1);
+    }
+
+    @Test
+    public void testNoServiceHandler() throws Exception {
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result);
+        final HttpResponse response1 = result.getHead();
+        final String entity1 = result.getBody();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(404, response1.getCode());
+        Assert.assertEquals("Resource not found", entity1);
+    }
+
+    @Test
+    public void testResponseNoContent() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there") {
+
+                    @Override
+                    protected void handle(
+                            final Message<HttpRequest, String> request,
+                            final ResponseTrigger responseTrigger,
+                            final HttpContext context) throws IOException, HttpException {
+                        final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
+                        responseTrigger.submitResponse(new BasicResponseProducer(response));
+                    }
+                };
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start();
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
+                new BasicRequestProducer("GET", createRequestURI(serverEndpoint, "/hello")),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result = future.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result);
+        final HttpResponse response1 = result.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(204, response1.getCode());
+        Assert.assertNull(result.getBody());
+    }
+
+    @Test
+    public void testAbsentHostHeader() throws Exception {
+        server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
+
+            @Override
+            public AsyncServerExchangeHandler get() {
+                return new SingleLineResponseHandler("Hi there");
+            }
+
+        });
+        final InetSocketAddress serverEndpoint = server.start();
+
+        client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()),
+                H1Config.DEFAULT, ConnectionConfig.DEFAULT);
+
+        final Future<ClientEndpoint> connectFuture = client.connect(
+                "localhost", serverEndpoint.getPort(), TIMEOUT, TimeUnit.SECONDS);
+        final ClientEndpoint streamEndpoint = connectFuture.get();
+
+        final HttpRequest request1 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/hello"));
+        request1.setVersion(HttpVersion.HTTP_1_0);
+        final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
+                new BasicRequestProducer(request1, null),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result1 = future1.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result1);
+        final HttpResponse response1 = result1.getHead();
+        Assert.assertNotNull(response1);
+        Assert.assertEquals(200, response1.getCode());
+        Assert.assertEquals("Hi there", result1.getBody());
+
+        final HttpRequest request2 = new BasicHttpRequest("GET", createRequestURI(serverEndpoint, "/hello"));
+        request2.setVersion(HttpVersion.HTTP_1_1);
+        final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
+                new BasicRequestProducer(request2, null),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+        final Message<HttpResponse, String> result2 = future2.get(TIMEOUT, TimeUnit.SECONDS);
+        Assert.assertNotNull(result2);
+        final HttpResponse response2 = result2.getHead();
+        Assert.assertNotNull(response2);
+        Assert.assertEquals(400, response2.getCode());
+        Assert.assertEquals("Host header is absent", result2.getBody());
+    }
+
+}