You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/04/14 11:13:28 UTC

svn commit: r1791350 [1/2] - in /httpcomponents/httpcore/trunk: httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/ httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/ httpcore5/src/examples/org/apache/hc/core5/http/examples/...

Author: olegk
Date: Fri Apr 14 11:13:28 2017
New Revision: 1791350

URL: http://svn.apache.org/viewvc?rev=1791350&view=rev
Log:
Rewrite of non-blocking HTTP/1.1 connection persistence and re-use code; improved HttpAsyncRequester; added integration tests for HttpAsyncRequester and HttpAsyncServer

Added:
    httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java   (with props)
Removed:
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/PoolEntryHolder.java
Modified:
    httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java
    httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
    httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java

Modified: httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/Http2RequestExecutionExample.java Fri Apr 14 11:13:28 2017
@@ -112,30 +112,33 @@ public class Http2RequestExecutionExampl
         for (final String requestUri: requestUris) {
             final Future<AsyncClientEndpoint> future = requester.connect(target, TimeValue.ofSeconds(5));
             final AsyncClientEndpoint clientEndpoint = future.get();
-            clientEndpoint.executeAndRelease(
+            clientEndpoint.execute(
                     new BasicRequestProducer("GET", target, requestUri),
                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
                     new FutureCallback<Message<HttpResponse, String>>() {
 
                         @Override
                         public void completed(final Message<HttpResponse, String> message) {
-                            latch.countDown();
+                            clientEndpoint.releaseAndReuse();
                             HttpResponse response = message.getHead();
                             String body = message.getBody();
                             System.out.println(requestUri + "->" + response.getCode());
                             System.out.println(body);
+                            latch.countDown();
                         }
 
                         @Override
                         public void failed(final Exception ex) {
-                            latch.countDown();
+                            clientEndpoint.releaseAndDiscard();
                             System.out.println(requestUri + "->" + ex);
+                            latch.countDown();
                         }
 
                         @Override
                         public void cancelled() {
-                            latch.countDown();
+                            clientEndpoint.releaseAndDiscard();
                             System.out.println(requestUri + " cancelled");
+                            latch.countDown();
                         }
 
                     });

Added: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java?rev=1791350&view=auto
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java (added)
+++ httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java Fri Apr 14 11:13:28 2017
@@ -0,0 +1,357 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.nio.http;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Future;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
+import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
+import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
+import org.apache.hc.core5.http.nio.AsyncPushProducer;
+import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.BasicResponseConsumer;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.ListenerEndpoint;
+import org.apache.hc.core5.util.TimeValue;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExternalResource;
+
+public class Http1ServerAndRequesterTest {
+
+    private static final TimeValue TIMEOUT = TimeValue.ofSeconds(30);
+
+    private HttpAsyncServer server;
+
+    @Rule
+    public ExternalResource serverResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            server = AsyncServerBootstrap.bootstrap()
+                    .setIOReactorConfig(
+                            IOReactorConfig.custom()
+                                    .setConnectTimeout(TIMEOUT)
+                                    .setSoTimeout(TIMEOUT)
+                                    .build())
+                    .register("/no-keep-alive*", new Supplier<AsyncServerExchangeHandler>() {
+
+                        @Override
+                        public AsyncServerExchangeHandler get() {
+                            return new EchoHandler(2048) {
+
+                                @Override
+                                public void handleRequest(
+                                        final HttpRequest request,
+                                        final EntityDetails entityDetails,
+                                        final ResponseChannel responseChannel) throws HttpException, IOException {
+                                    super.handleRequest(request, entityDetails, new ResponseChannel() {
+
+                                        @Override
+                                        public void sendInformation(final HttpResponse response) throws HttpException, IOException {
+                                            responseChannel.sendInformation(response);
+                                        }
+
+                                        @Override
+                                        public void sendResponse(
+                                                final HttpResponse response,
+                                                final EntityDetails entityDetails) throws HttpException, IOException {
+                                            response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
+                                            responseChannel.sendResponse(response, entityDetails);
+                                        }
+
+                                        @Override
+                                        public void pushPromise(
+                                                final HttpRequest promise,
+                                                final AsyncPushProducer pushProducer) throws HttpException, IOException {
+                                            responseChannel.pushPromise(promise, pushProducer);
+                                        }
+
+                                    });
+                                }
+                            };
+                        }
+
+                    })
+                    .register("*", new Supplier<AsyncServerExchangeHandler>() {
+
+                        @Override
+                        public AsyncServerExchangeHandler get() {
+                            return new EchoHandler(2048);
+                        }
+
+                    })
+                    .create();
+        }
+
+        @Override
+        protected void after() {
+            if (server != null) {
+                try {
+                    server.shutdown(ShutdownType.IMMEDIATE);
+                    server = null;
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    private HttpAsyncRequester requester;
+
+    @Rule
+    public ExternalResource clientResource = new ExternalResource() {
+
+        @Override
+        protected void before() throws Throwable {
+            requester = AsyncRequesterBootstrap.bootstrap()
+                    .setIOReactorConfig(IOReactorConfig.custom()
+                            .setConnectTimeout(TIMEOUT)
+                            .setSoTimeout(TIMEOUT)
+                            .build())
+                    .create();
+        }
+
+        @Override
+        protected void after() {
+            if (requester != null) {
+                try {
+                    requester.shutdown(ShutdownType.IMMEDIATE);
+                    requester = null;
+                } catch (final Exception ignore) {
+                }
+            }
+        }
+
+    };
+
+    @Test
+    public void testSequentialRequests() throws Exception {
+        server.start();
+        final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+        listener.waitFor();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final HttpHost target = new HttpHost("localhost", address.getPort());
+        final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
+                new BasicRequestProducer("POST", target, "/stuff",
+                        new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message1, CoreMatchers.notNullValue());
+        final HttpResponse response1 = message1.getHead();
+        Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body1 = message1.getBody();
+        Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+        final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
+                new BasicRequestProducer("POST", target, "/other-stuff",
+                        new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message2, CoreMatchers.notNullValue());
+        final HttpResponse response2 = message2.getHead();
+        Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body2 = message2.getBody();
+        Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+        final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
+                new BasicRequestProducer("POST", target, "/more-stuff",
+                        new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message3, CoreMatchers.notNullValue());
+        final HttpResponse response3 = message3.getHead();
+        Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body3 = message3.getBody();
+        Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+    }
+
+    @Test
+    public void testSequentialRequestsNonPersistentConnection() throws Exception {
+        server.start();
+        final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+        listener.waitFor();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final HttpHost target = new HttpHost("localhost", address.getPort());
+        final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
+                new BasicRequestProducer("POST", target, "/no-keep-alive/stuff",
+                        new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message1, CoreMatchers.notNullValue());
+        final HttpResponse response1 = message1.getHead();
+        Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body1 = message1.getBody();
+        Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+        final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
+                new BasicRequestProducer("POST", target, "/no-keep-alive/other-stuff",
+                        new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message2, CoreMatchers.notNullValue());
+        final HttpResponse response2 = message2.getHead();
+        Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body2 = message2.getBody();
+        Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+        final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
+                new BasicRequestProducer("POST", target, "/no-keep-alive/more-stuff",
+                        new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
+        final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        Assert.assertThat(message3, CoreMatchers.notNullValue());
+        final HttpResponse response3 = message3.getHead();
+        Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+        final String body3 = message3.getBody();
+        Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+    }
+
+    @Test
+    public void testSequentialRequestsSameEndpoint() throws Exception {
+        server.start();
+        final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+        listener.waitFor();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final HttpHost target = new HttpHost("localhost", address.getPort());
+        final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, TimeValue.ofSeconds(5));
+        final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        try {
+
+            final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/stuff",
+                            new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+            Assert.assertThat(message1, CoreMatchers.notNullValue());
+            final HttpResponse response1 = message1.getHead();
+            Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+            final String body1 = message1.getBody();
+            Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
+
+            final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/other-stuff",
+                            new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+            Assert.assertThat(message2, CoreMatchers.notNullValue());
+            final HttpResponse response2 = message2.getHead();
+            Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+            final String body2 = message2.getBody();
+            Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
+
+            final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/more-stuff",
+                            new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+            final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+            Assert.assertThat(message3, CoreMatchers.notNullValue());
+            final HttpResponse response3 = message3.getHead();
+            Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+            final String body3 = message3.getBody();
+            Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
+
+        } finally {
+            endpoint.releaseAndReuse();
+        }
+    }
+
+    @Test
+    public void testPipelinedRequests() throws Exception {
+        server.start();
+        final ListenerEndpoint listener = server.listen(new InetSocketAddress(0));
+        listener.waitFor();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final HttpHost target = new HttpHost("localhost", address.getPort());
+        final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, TimeValue.ofSeconds(5));
+        final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+        try {
+
+            final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
+
+            queue.add(endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/stuff",
+                            new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+            queue.add(endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/other-stuff",
+                            new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+            queue.add(endpoint.execute(
+                    new BasicRequestProducer("POST", target, "/more-stuff",
+                            new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
+                    new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
+
+            while (!queue.isEmpty()) {
+                final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
+                final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                Assert.assertThat(message, CoreMatchers.notNullValue());
+                final HttpResponse response = message.getHead();
+                Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
+                final String body = message.getBody();
+                Assert.assertThat(body, CoreMatchers.containsString("stuff"));
+            }
+
+        } finally {
+            endpoint.releaseAndReuse();
+        }
+    }
+
+}

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: httpcomponents/httpcore/trunk/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/Http1ServerAndRequesterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/examples/org/apache/hc/core5/http/examples/AsyncRequestExecutionExample.java Fri Apr 14 11:13:28 2017
@@ -27,7 +27,6 @@
 package org.apache.hc.core5.http.examples;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hc.core5.concurrent.FutureCallback;
@@ -41,7 +40,6 @@ import org.apache.hc.core5.http.impl.boo
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.message.RequestLine;
 import org.apache.hc.core5.http.message.StatusLine;
-import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.BasicRequestProducer;
 import org.apache.hc.core5.http.nio.BasicResponseConsumer;
 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
@@ -103,32 +101,31 @@ public class AsyncRequestExecutionExampl
 
         final CountDownLatch latch = new CountDownLatch(requestUris.length);
         for (final String requestUri: requestUris) {
-            final Future<AsyncClientEndpoint> future = requester.connect(target, TimeValue.ofSeconds(5));
-            final AsyncClientEndpoint clientEndpoint = future.get();
-            clientEndpoint.executeAndRelease(
+            requester.execute(
                     new BasicRequestProducer("GET", target, requestUri),
                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+                    TimeValue.ofSeconds(5),
                     new FutureCallback<Message<HttpResponse, String>>() {
 
                         @Override
                         public void completed(final Message<HttpResponse, String> message) {
-                            latch.countDown();
                             HttpResponse response = message.getHead();
                             String body = message.getBody();
                             System.out.println(requestUri + "->" + response.getCode());
                             System.out.println(body);
+                            latch.countDown();
                         }
 
                         @Override
                         public void failed(final Exception ex) {
-                            latch.countDown();
                             System.out.println(requestUri + "->" + ex);
+                            latch.countDown();
                         }
 
                         @Override
                         public void cancelled() {
-                            latch.countDown();
                             System.out.println(requestUri + " cancelled");
+                            latch.countDown();
                         }
 
                     });

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java Fri Apr 14 11:13:28 2017
@@ -29,20 +29,38 @@ package org.apache.hc.core5.http.impl.bo
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.ExceptionListener;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.ProtocolException;
 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
 import org.apache.hc.core5.http.nio.command.ExecutionCommand;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.net.URIAuthority;
 import org.apache.hc.core5.pool.ControlledConnPool;
 import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.reactor.IOEventHandlerFactory;
@@ -97,13 +115,13 @@ public class HttpAsyncRequester extends
 
             @Override
             public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
-                final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder = new PoolEntryHolder<>(connPool, poolEntry);
+                final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
                 final IOSession ioSession = poolEntry.getConnection();
                 if (ioSession != null && ioSession.isClosed()) {
                     poolEntry.discardConnection(ShutdownType.IMMEDIATE);
                 }
                 if (poolEntry.hasConnection()) {
-                    resultFuture.completed(new InternalAsyncClientEndpoint(poolEntryHolder));
+                    resultFuture.completed(endpoint);
                 } else {
                     final SessionRequest sessionRequest = requestSession(host, timeout, new SessionRequestCallback() {
 
@@ -117,8 +135,9 @@ public class HttpAsyncRequester extends
                                         session.getLocalAddress(),
                                         session.getRemoteAddress());
                             }
+                            session.setSocketTimeout(timeout.toMillisIntBound());
                             poolEntry.assignConnection(session);
-                            resultFuture.completed(new InternalAsyncClientEndpoint(poolEntryHolder));
+                            resultFuture.completed(endpoint);
                         }
 
                         @Override
@@ -126,7 +145,7 @@ public class HttpAsyncRequester extends
                             try {
                                 resultFuture.failed(request.getException());
                             } finally {
-                                poolEntryHolder.abortConnection();
+                                endpoint.releaseAndDiscard();
                             }
                         }
 
@@ -135,7 +154,7 @@ public class HttpAsyncRequester extends
                             try {
                                 resultFuture.failed(new SocketTimeoutException("Connect timeout"));
                             } finally {
-                                poolEntryHolder.abortConnection();
+                                endpoint.releaseAndDiscard();
                             }
                         }
 
@@ -144,7 +163,7 @@ public class HttpAsyncRequester extends
                             try {
                                 resultFuture.cancel();
                             } finally {
-                                poolEntryHolder.abortConnection();
+                                endpoint.releaseAndDiscard();
                             }
                         }
 
@@ -172,31 +191,197 @@ public class HttpAsyncRequester extends
         return connect(host, timeout, null);
     }
 
-    private static class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+    public void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final TimeValue timeout,
+            final HttpContext context) {
+        Args.notNull(exchangeHandler, "Exchange handler");
+        Args.notNull(timeout, "Timeout");
+        Args.notNull(context, "Context");
+        try {
+            exchangeHandler.produceRequest(new RequestChannel() {
+
+                @Override
+                public void sendRequest(
+                        final HttpRequest request,
+                        final EntityDetails entityDetails) throws HttpException, IOException {
+                    final String scheme = request.getScheme();
+                    final URIAuthority authority = request.getAuthority();
+                    if (authority == null) {
+                        throw new ProtocolException("Request authority not specified");
+                    }
+                    final HttpHost target = new HttpHost(authority, scheme);
+                    connect(target, timeout, new FutureCallback<AsyncClientEndpoint>() {
+
+                        @Override
+                        public void completed(final AsyncClientEndpoint endpoint) {
+                            endpoint.execute(new AsyncClientExchangeHandler() {
+
+                                @Override
+                                public void releaseResources() {
+                                    endpoint.releaseAndDiscard();
+                                    exchangeHandler.releaseResources();
+                                }
+
+                                @Override
+                                public void failed(final Exception cause) {
+                                    endpoint.releaseAndDiscard();
+                                    exchangeHandler.failed(cause);
+                                }
+
+                                @Override
+                                public void cancel() {
+                                    endpoint.releaseAndDiscard();
+                                    exchangeHandler.cancel();
+                                }
+
+                                @Override
+                                public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
+                                    channel.sendRequest(request, entityDetails);
+                                }
+
+                                @Override
+                                public int available() {
+                                    return exchangeHandler.available();
+                                }
+
+                                @Override
+                                public void produce(final DataStreamChannel channel) throws IOException {
+                                    exchangeHandler.produce(channel);
+                                }
+
+                                @Override
+                                public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
+                                    exchangeHandler.consumeInformation(response);
+                                }
+
+                                @Override
+                                public void consumeResponse(
+                                        final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+                                    if (entityDetails == null) {
+                                        endpoint.releaseAndReuse();
+                                    }
+                                    exchangeHandler.consumeResponse(response, entityDetails);
+                                }
+
+                                @Override
+                                public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+                                    exchangeHandler.updateCapacity(capacityChannel);
+                                }
+
+                                @Override
+                                public int consume(final ByteBuffer src) throws IOException {
+                                    return exchangeHandler.consume(src);
+                                }
+
+                                @Override
+                                public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+                                    endpoint.releaseAndReuse();
+                                    exchangeHandler.streamEnd(trailers);
+                                }
+
+                            }, context);
+
+                        }
+
+                        @Override
+                        public void failed(final Exception ex) {
+                            exchangeHandler.failed(ex);
+                        }
+
+                        @Override
+                        public void cancelled() {
+                            exchangeHandler.cancel();
+                        }
+
+                    });
+
+                }
 
-        final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder;
+            });
 
-        InternalAsyncClientEndpoint(final PoolEntryHolder<HttpHost, IOSession> poolEntryHolder) {
-            this.poolEntryHolder = poolEntryHolder;
+        } catch (IOException | HttpException ex) {
+            exchangeHandler.failed(ex);
+        }
+    }
+
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final TimeValue timeout,
+            final HttpContext context,
+            final FutureCallback<T> callback) {
+        Args.notNull(requestProducer, "Request producer");
+        Args.notNull(responseConsumer, "Response consumer");
+        Args.notNull(timeout, "Timeout");
+        final BasicFuture<T> future = new BasicFuture<>(callback);
+        final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
+
+            @Override
+            public void completed(final T result) {
+                future.completed(result);
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                future.failed(ex);
+            }
+
+            @Override
+            public void cancelled() {
+                future.cancel();
+            }
+
+        });
+        execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+        return future;
+    }
+
+    public final <T> Future<T> execute(
+            final AsyncRequestProducer requestProducer,
+            final AsyncResponseConsumer<T> responseConsumer,
+            final TimeValue timeout,
+            final FutureCallback<T> callback) {
+        return execute(requestProducer, responseConsumer, timeout, null, callback);
+    }
+
+    private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
+
+        final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
+
+        InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
+            this.poolEntryRef = new AtomicReference<>(poolEntry);
         }
 
         @Override
         public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
-            final IOSession connection = poolEntryHolder.getConnection();
-            if (connection == null) {
+            final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
+            if (poolEntry == null) {
                 throw new IllegalStateException("Endpoint has already been released");
             }
-            connection.addLast(new ExecutionCommand(exchangeHandler, context));
+            final IOSession ioSession = poolEntry.getConnection();
+            if (ioSession == null) {
+                throw new IllegalStateException("I/O session is invalid");
+            }
+            ioSession.addLast(new ExecutionCommand(exchangeHandler, context));
         }
 
         @Override
         public void releaseAndReuse() {
-            poolEntryHolder.releaseConnection();
+            final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
+            if (poolEntry != null) {
+                final IOSession ioSession = poolEntry.getConnection();
+                connPool.release(poolEntry, ioSession != null && !ioSession.isClosed());    ;
+            }
         }
 
         @Override
         public void releaseAndDiscard() {
-            poolEntryHolder.abortConnection();
+            final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
+            if (poolEntry != null) {
+                poolEntry.discardConnection(ShutdownType.IMMEDIATE);
+                connPool.release(poolEntry, false);    ;
+            }
         }
 
     }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpRequester.java Fri Apr 14 11:13:28 2017
@@ -37,6 +37,7 @@ import java.net.SocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLSocketFactory;
 
@@ -185,7 +186,7 @@ public class HttpRequester implements Gr
         } catch (final TimeoutException ex) {
             throw new ConnectionRequestTimeoutException("Connection request timeout");
         }
-        final PoolEntryHolder<HttpHost, HttpClientConnection> connectionHolder = new PoolEntryHolder<>(connPool, poolEntry);
+        final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
         try {
             HttpClientConnection connection = poolEntry.getConnection();
             if (connection == null) {
@@ -200,11 +201,8 @@ public class HttpRequester implements Gr
                 response.setEntity(new HttpEntityWrapper(entity) {
 
                     private void releaseConnection() throws IOException {
-                        if (connectionHolder.isReleased()) {
-                            return;
-                        }
                         try {
-                            final HttpClientConnection localConn = poolEntry.getConnection();
+                            final HttpClientConnection localConn = connectionHolder.getConnection();
                             if (localConn != null) {
                                 if (requestExecutor.keepAlive(request, response, localConn, context)) {
                                     if (super.isStreaming()) {
@@ -213,16 +211,16 @@ public class HttpRequester implements Gr
                                             content.close();
                                         }
                                     }
-                                    connectionHolder.markReusable();
+                                    connectionHolder.releaseConnection();
                                 }
                             }
                         } finally {
-                            connectionHolder.releaseConnection();
+                            connectionHolder.discardConnection();
                         }
                     }
 
                     private void abortConnection() {
-                        connectionHolder.releaseConnection();
+                        connectionHolder.discardConnection();
                     }
 
                     @Override
@@ -276,7 +274,7 @@ public class HttpRequester implements Gr
             }
             return response;
         } catch (HttpException | IOException | RuntimeException ex) {
-            connectionHolder.abortConnection();
+            connectionHolder.discardConnection();
             throw ex;
         }
     }
@@ -308,4 +306,35 @@ public class HttpRequester implements Gr
         connPool.close();
     }
 
+    private class PoolEntryHolder {
+
+        private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
+
+        PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
+            this.poolEntryRef = new AtomicReference<>(poolEntry);
+        }
+
+        HttpClientConnection getConnection() {
+            final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
+            return poolEntry != null ? poolEntry.getConnection() : null;
+        }
+
+        void releaseConnection() {
+            final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
+            if (poolEntry != null) {
+                final HttpClientConnection connection = poolEntry.getConnection();
+                connPool.release(poolEntry, connection != null && connection.isOpen());
+            }
+        }
+
+        void discardConnection() {
+            final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
+            if (poolEntry != null) {
+                poolEntry.discardConnection(ShutdownType.IMMEDIATE);
+                connPool.release(poolEntry, false);
+            }
+        }
+
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -57,6 +57,7 @@ import org.apache.hc.core5.http.impl.Bas
 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
 import org.apache.hc.core5.http.impl.CharCodingSupport;
 import org.apache.hc.core5.http.impl.ConnectionListener;
+import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.ContentDecoder;
 import org.apache.hc.core5.http.nio.ContentEncoder;
 import org.apache.hc.core5.http.nio.NHttpMessageParser;
@@ -92,6 +93,7 @@ abstract class AbstractHttp1StreamDuplex
     private final BasicHttpConnectionMetrics connMetrics;
     private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
     private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
+    private final ByteBuffer contentBuffer;
     private final ConnectionListener connectionListener;
     private final Lock outputLock;
     private final AtomicInteger outputRequests;
@@ -122,13 +124,24 @@ abstract class AbstractHttp1StreamDuplex
         this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
         this.incomingMessageParser = incomingMessageParser;
         this.outgoingMessageWriter = outgoingMessageWriter;
+        this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
         this.connectionListener = connectionListener;
         this.outputLock = new ReentrantLock();
         this.outputRequests = new AtomicInteger(0);
         this.connState = ConnectionState.READY;
     }
 
-    void doTerminate(final Exception exception) throws IOException {
+    void shutdownSession(final ShutdownType shutdownType) {
+        if (shutdownType == ShutdownType.GRACEFUL) {
+            connState = ConnectionState.GRACEFUL_SHUTDOWN;
+            ioSession.addLast(new ShutdownCommand(ShutdownType.GRACEFUL));
+        } else {
+            connState = ConnectionState.SHUTDOWN;
+            ioSession.close();
+        }
+    }
+
+    void shutdownSession(final Exception exception) throws IOException {
         connState = ConnectionState.SHUTDOWN;
         try {
             terminate(exception);
@@ -137,6 +150,8 @@ abstract class AbstractHttp1StreamDuplex
         }
     }
 
+    abstract void disconnected();
+
     abstract void terminate(final Exception exception);
 
     abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
@@ -157,7 +172,11 @@ abstract class AbstractHttp1StreamDuplex
             SessionOutputBuffer buffer,
             BasicHttpTransportMetrics metrics) throws HttpException;
 
-    abstract int consumeData(ContentDecoder contentDecoder) throws HttpException, IOException;
+    abstract int consumeData(ByteBuffer src) throws HttpException, IOException;
+
+    abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
+
+    abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
 
     abstract boolean isOutputReady();
 
@@ -238,7 +257,11 @@ abstract class AbstractHttp1StreamDuplex
                             break;
                         } else {
                             inputEnd();
-                            ioSession.setEvent(SelectionKey.OP_READ);
+                            if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
+                                ioSession.setEvent(SelectionKey.OP_READ);
+                            } else {
+                                break;
+                            }
                         }
                     }
                 } while (bytesRead > 0);
@@ -247,7 +270,7 @@ abstract class AbstractHttp1StreamDuplex
                     if (outputIdle() && inputIdle()) {
                         requestShutdown(ShutdownType.IMMEDIATE);
                     } else {
-                        doTerminate(new ConnectionClosedException("Connection closed by peer"));
+                        shutdownSession(new ConnectionClosedException("Connection closed by peer"));
                     }
                     return;
                 }
@@ -255,14 +278,37 @@ abstract class AbstractHttp1StreamDuplex
 
             if (incomingMessage != null) {
                 final ContentDecoder contentDecoder = incomingMessage.getBody();
-                final int bytesRead = consumeData(contentDecoder);
-                if (bytesRead > 0) {
-                    totalBytesRead += bytesRead;
+
+                int bytesRead;
+                while ((bytesRead = contentDecoder.read(contentBuffer)) > 0) {
+                    if (bytesRead > 0) {
+                        totalBytesRead += bytesRead;
+                    }
+                    contentBuffer.flip();
+                    final int capacity = consumeData(contentBuffer);
+                    contentBuffer.clear();
+                    if (capacity <= 0) {
+                        if (!contentDecoder.isCompleted()) {
+                            ioSession.clearEvent(SelectionKey.OP_READ);
+                            updateCapacity(new CapacityChannel() {
+
+                                @Override
+                                public void update(final int increment) throws IOException {
+                                    if (increment > 0) {
+                                        requestSessionInput();
+                                    }
+                                }
+
+                            });
+                        }
+                        break;
+                    }
                 }
                 if (contentDecoder.isCompleted()) {
+                    dataEnd(contentDecoder.getTrailers());
                     incomingMessage = null;
-                    inputEnd();
                     ioSession.setEvent(SelectionKey.OP_READ);
+                    inputEnd();
                 }
             }
             if (totalBytesRead == 0 && messagesReceived == 0) {
@@ -327,7 +373,7 @@ abstract class AbstractHttp1StreamDuplex
 
     public final void onTimeout() throws IOException, HttpException {
         if (!handleTimeout()) {
-            doTerminate(new SocketTimeoutException());
+            shutdownSession(new SocketTimeoutException());
         }
     }
 
@@ -336,7 +382,7 @@ abstract class AbstractHttp1StreamDuplex
             connectionListener.onError(this, ex);
         }
         try {
-            doTerminate(ex);
+            shutdownSession(ex);
         } catch (IOException ex2) {
             if (connectionListener != null) {
                 connectionListener.onError(this, ex2);
@@ -346,6 +392,7 @@ abstract class AbstractHttp1StreamDuplex
 
     public final void onDisconnect() {
         cancelPendingCommands();
+        disconnected();
         releaseResources();
         if (connectionListener != null) {
             connectionListener.onDisconnect(this);

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -53,6 +53,7 @@ import org.apache.hc.core5.http.impl.Def
 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
 import org.apache.hc.core5.http.impl.Http1StreamListener;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.ContentDecoder;
 import org.apache.hc.core5.http.nio.ContentEncoder;
 import org.apache.hc.core5.http.nio.NHttpMessageParser;
@@ -60,9 +61,9 @@ import org.apache.hc.core5.http.nio.NHtt
 import org.apache.hc.core5.http.nio.SessionInputBuffer;
 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
 import org.apache.hc.core5.http.nio.command.ExecutionCommand;
-import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
@@ -75,11 +76,9 @@ public class ClientHttp1StreamDuplexer e
     private final ContentLengthStrategy incomingContentStrategy;
     private final ContentLengthStrategy outgoingContentStrategy;
     private final Http1StreamListener streamListener;
-    private final ByteBuffer contentBuffer;
     private final Queue<ClientHttp1StreamHandler> pipeline;
     private final Http1StreamChannel<HttpRequest> outputChannel;
 
-    private volatile boolean inconsistent;
     private volatile ClientHttp1StreamHandler outgoing;
     private volatile ClientHttp1StreamHandler incoming;
 
@@ -105,11 +104,15 @@ public class ClientHttp1StreamDuplexer e
         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
                 DefaultContentLengthStrategy.INSTANCE;
         this.streamListener = streamListener;
-        this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
         this.pipeline = new ConcurrentLinkedQueue<>();
         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
 
             @Override
+            public void close() {
+                shutdownSession(ShutdownType.IMMEDIATE);
+            }
+
+            @Override
             public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
                 if (streamListener != null) {
                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
@@ -118,23 +121,6 @@ public class ClientHttp1StreamDuplexer e
             }
 
             @Override
-            public void update(final int increment) throws IOException {
-                if (increment > 0) {
-                    requestSessionInput();
-                }
-            }
-
-            @Override
-            public void suspendInput() {
-                suspendSessionInput();
-            }
-
-            @Override
-            public void requestInput() {
-                requestSessionInput();
-            }
-
-            @Override
             public void suspendOutput() {
                 suspendSessionOutput();
             }
@@ -170,11 +156,13 @@ public class ClientHttp1StreamDuplexer e
             }
 
             @Override
-            public void abortOutput() throws IOException {
+            public boolean abortGracefully() throws IOException {
                 final MessageDelineation messageDelineation = endOutputStream(null);
                 if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
-                    inconsistent = true;
                     requestShutdown(ShutdownType.GRACEFUL);
+                    return false;
+                } else {
+                    return true;
                 }
             }
 
@@ -182,8 +170,6 @@ public class ClientHttp1StreamDuplexer e
             public void activate() throws HttpException, IOException {
             }
 
-
-
         };
     }
 
@@ -211,16 +197,46 @@ public class ClientHttp1StreamDuplexer e
     void terminate(final Exception exception) {
         if (incoming != null) {
             incoming.failed(exception);
+            incoming.releaseResources();
             incoming = null;
         }
         if (outgoing != null) {
             outgoing.failed(exception);
+            outgoing.releaseResources();
             outgoing = null;
         }
         for (;;) {
             final ClientHttp1StreamHandler handler = pipeline.poll();
             if (handler != null) {
                 handler.failed(exception);
+                handler.releaseResources();
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    void disconnected() {
+        if (incoming != null) {
+            if (!incoming.isCompleted()) {
+                incoming.failed(new ConnectionClosedException("Connection closed"));
+            }
+            incoming.releaseResources();
+            incoming = null;
+        }
+        if (outgoing != null) {
+            if (!outgoing.isCompleted()) {
+                outgoing.failed(new ConnectionClosedException("Connection closed"));
+            }
+            outgoing.releaseResources();
+            outgoing = null;
+        }
+        for (;;) {
+            final ClientHttp1StreamHandler handler = pipeline.poll();
+            if (handler != null) {
+                handler.failed(new ConnectionClosedException("Connection closed"));
+                handler.releaseResources();
             } else {
                 break;
             }
@@ -318,8 +334,7 @@ public class ClientHttp1StreamDuplexer e
                 h1Config,
                 connectionReuseStrategy,
                 exchangeHandler,
-                context,
-                contentBuffer);
+                context);
         pipeline.add(handler);
         outgoing = handler;
 
@@ -350,34 +365,33 @@ public class ClientHttp1StreamDuplexer e
     }
 
     @Override
-    int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+    int consumeData(final ByteBuffer src) throws HttpException, IOException {
         Asserts.notNull(incoming, "Response stream handler");
-        return incoming.consumeData(contentDecoder);
+        return incoming.consumeData(src);
     }
 
     @Override
-    void inputEnd() throws HttpException, IOException {
+    void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
         Asserts.notNull(incoming, "Response stream handler");
-        if (incoming.isResponseCompleted()) {
-            final boolean keepAlive = !inconsistent && incoming.keepAlive();
+        incoming.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        Asserts.notNull(incoming, "Response stream handler");
+        incoming.dataEnd(trailers);
+    }
+
+    @Override
+    void inputEnd() throws HttpException, IOException {
+        if (incoming != null && incoming.isResponseFinal()) {
+            if (streamListener != null) {
+                streamListener.onExchangeComplete(this, isOpen());
+            }
             if (incoming.isCompleted()) {
                 incoming.releaseResources();
             }
             incoming = null;
-            if (streamListener != null) {
-                streamListener.onExchangeComplete(this, keepAlive);
-            }
-            if (!keepAlive) {
-                if (outgoing != null && outgoing.isCompleted()) {
-                    outgoing.releaseResources();
-                    outgoing = null;
-                }
-                if (outgoing == null && pipeline.isEmpty()) {
-                    requestShutdown(ShutdownType.IMMEDIATE);
-                } else {
-                    doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
-                }
-            }
         }
     }
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java Fri Apr 14 11:13:28 2017
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.hc.core5.http.ConnectionReuseStrategy;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HeaderElements;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpHeaders;
 import org.apache.hc.core5.http.HttpRequest;
@@ -47,7 +46,7 @@ import org.apache.hc.core5.http.Unsuppor
 import org.apache.hc.core5.http.config.H1Config;
 import org.apache.hc.core5.http.impl.LazyEntityDetails;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
-import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
 import org.apache.hc.core5.http.nio.HttpContextAware;
 import org.apache.hc.core5.http.nio.RequestChannel;
@@ -64,13 +63,12 @@ class ClientHttp1StreamHandler implement
     private final ConnectionReuseStrategy connectionReuseStrategy;
     private final AsyncClientExchangeHandler exchangeHandler;
     private final HttpCoreContext context;
-    private final ByteBuffer inputBuffer;
     private final AtomicBoolean requestCommitted;
     private final AtomicBoolean done;
 
+    private volatile boolean keepAlive;
     private volatile int timeout;
     private volatile HttpRequest committedRequest;
-    private volatile HttpResponse receivedResponse;
     private volatile MessageState requestState;
     private volatile MessageState responseState;
 
@@ -80,8 +78,7 @@ class ClientHttp1StreamHandler implement
             final H1Config h1Config,
             final ConnectionReuseStrategy connectionReuseStrategy,
             final AsyncClientExchangeHandler exchangeHandler,
-            final HttpCoreContext context,
-            final ByteBuffer inputBuffer) {
+            final HttpCoreContext context) {
         this.outputChannel = outputChannel;
         this.internalDataChannel = new DataStreamChannel() {
 
@@ -113,14 +110,14 @@ class ClientHttp1StreamHandler implement
         this.connectionReuseStrategy = connectionReuseStrategy;
         this.exchangeHandler = exchangeHandler;
         this.context = context;
-        this.inputBuffer = inputBuffer;
         this.requestCommitted = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
+        this.keepAlive = true;
         this.requestState = MessageState.IDLE;
         this.responseState = MessageState.HEADERS;
     }
 
-    boolean isResponseCompleted() {
+    boolean isResponseFinal() {
         return responseState == MessageState.COMPLETE;
     }
 
@@ -128,11 +125,6 @@ class ClientHttp1StreamHandler implement
         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
     }
 
-    boolean keepAlive() {
-        return committedRequest != null && receivedResponse != null &&
-                connectionReuseStrategy.keepAlive(committedRequest, receivedResponse, context);
-    }
-
     boolean isHeadRequest() {
         return committedRequest != null && "HEAD".equalsIgnoreCase(committedRequest.getMethod());
     }
@@ -224,6 +216,10 @@ class ClientHttp1StreamHandler implement
         }
         if (status < HttpStatus.SC_SUCCESS) {
             exchangeHandler.consumeInformation(response);
+        } else {
+            if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
+                keepAlive = false;
+            }
         }
         if (requestState == MessageState.ACK) {
             if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
@@ -238,54 +234,49 @@ class ClientHttp1StreamHandler implement
             return;
         }
         if (requestState == MessageState.BODY) {
-            boolean keepAlive = status < HttpStatus.SC_CLIENT_ERROR;
-            if (keepAlive) {
-                final Header h = response.getFirstHeader(HttpHeaders.CONNECTION);
-                if (h != null && HeaderElements.CLOSE.equalsIgnoreCase(h.getValue())) {
+            if (keepAlive && status >= HttpStatus.SC_CLIENT_ERROR) {
+                requestState = MessageState.COMPLETE;
+                if (!outputChannel.abortGracefully()) {
                     keepAlive = false;
                 }
             }
-            if (!keepAlive) {
-                requestState = MessageState.COMPLETE;
-                outputChannel.abortOutput();
-            }
         }
 
         final EntityDetails entityDetails = endStream ? null : new LazyEntityDetails(response);
         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
         httpProcessor.process(response, entityDetails, context);
-        receivedResponse = response;
 
         exchangeHandler.consumeResponse(response, entityDetails);
-        responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
+        if (endStream) {
+            if (!keepAlive) {
+                outputChannel.close();
+            }
+            responseState = MessageState.COMPLETE;
+        } else {
+            responseState = MessageState.BODY;
+        }
     }
 
-    int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+    int consumeData(final ByteBuffer src) throws HttpException, IOException {
         if (done.get() || responseState != MessageState.BODY) {
             throw new ProtocolException("Unexpected message data");
         }
-        int total = 0;
-        int byteRead;
-        while ((byteRead = contentDecoder.read(inputBuffer)) > 0) {
-            total += byteRead;
-            inputBuffer.flip();
-            final int capacity = exchangeHandler.consume(inputBuffer);
-            inputBuffer.clear();
-            if (capacity <= 0) {
-                if (!contentDecoder.isCompleted()) {
-                    outputChannel.suspendInput();
-                    exchangeHandler.updateCapacity(outputChannel);
-                }
-                break;
-            }
+        return exchangeHandler.consume(src);
+    }
+
+    void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        exchangeHandler.updateCapacity(capacityChannel);
+    }
+
+    void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        if (done.get() || responseState != MessageState.BODY) {
+            throw new ProtocolException("Unexpected message data");
         }
-        if (contentDecoder.isCompleted()) {
-            responseState = MessageState.COMPLETE;
-            exchangeHandler.streamEnd(contentDecoder.getTrailers());
-            return total > 0 ? total : -1;
-        } else {
-            return total;
+        if (!keepAlive) {
+            outputChannel.close();
         }
+        responseState = MessageState.COMPLETE;
+        exchangeHandler.streamEnd(trailers);
     }
 
     boolean handleTimeout() {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/Http1StreamChannel.java Fri Apr 14 11:13:28 2017
@@ -30,24 +30,21 @@ import java.io.IOException;
 
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.HttpMessage;
-import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.ContentEncoder;
 
-interface Http1StreamChannel<OutgoingMessage extends HttpMessage> extends ContentEncoder, CapacityChannel {
+interface Http1StreamChannel<OutgoingMessage extends HttpMessage> extends ContentEncoder {
+
+    void close();
 
     void activate() throws HttpException, IOException;
 
     void submit(OutgoingMessage messageHead, boolean endStream) throws HttpException, IOException;
 
-    void suspendInput();
-
-    void requestInput();
-
     void requestOutput();
 
     void suspendOutput();
 
-    void abortOutput() throws IOException;
+    boolean abortGracefully() throws IOException;
 
     int getSocketTimeout();
 

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java Fri Apr 14 11:13:28 2017
@@ -51,6 +51,7 @@ import org.apache.hc.core5.http.impl.Def
 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
 import org.apache.hc.core5.http.impl.Http1StreamListener;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
+import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.ContentDecoder;
 import org.apache.hc.core5.http.nio.ContentEncoder;
 import org.apache.hc.core5.http.nio.HandlerFactory;
@@ -59,9 +60,9 @@ import org.apache.hc.core5.http.nio.NHtt
 import org.apache.hc.core5.http.nio.SessionInputBuffer;
 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
 import org.apache.hc.core5.http.nio.command.ExecutionCommand;
-import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.io.ShutdownType;
 import org.apache.hc.core5.reactor.IOSession;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.Asserts;
@@ -75,11 +76,9 @@ public class ServerHttp1StreamDuplexer e
     private final ContentLengthStrategy incomingContentStrategy;
     private final ContentLengthStrategy outgoingContentStrategy;
     private final Http1StreamListener streamListener;
-    private final ByteBuffer contentBuffer;
     private final Queue<ServerHttp1StreamHandler> pipeline;
     private final Http1StreamChannel<HttpResponse> outputChannel;
 
-    private volatile boolean inconsistent;
     private volatile ServerHttp1StreamHandler outgoing;
     private volatile ServerHttp1StreamHandler incoming;
 
@@ -107,11 +106,15 @@ public class ServerHttp1StreamDuplexer e
         this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
                 DefaultContentLengthStrategy.INSTANCE;
         this.streamListener = streamListener;
-        this.contentBuffer = ByteBuffer.allocate(this.h1Config.getBufferSize());
         this.pipeline = new ConcurrentLinkedQueue<>();
         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
 
             @Override
+            public void close() {
+                shutdown(ShutdownType.IMMEDIATE);
+            }
+
+            @Override
             public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
                 if (streamListener != null) {
                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
@@ -120,23 +123,6 @@ public class ServerHttp1StreamDuplexer e
             }
 
             @Override
-            public void update(final int increment) throws IOException {
-                if (increment > 0) {
-                    requestSessionInput();
-                }
-            }
-
-            @Override
-            public void suspendInput() {
-                suspendSessionInput();
-            }
-
-            @Override
-            public void requestInput() {
-                requestSessionInput();
-            }
-
-            @Override
             public void requestOutput() {
                 requestSessionOutput();
             }
@@ -172,11 +158,9 @@ public class ServerHttp1StreamDuplexer e
             }
 
             @Override
-            public void abortOutput() throws IOException {
+            public boolean abortGracefully() throws IOException {
                 final MessageDelineation messageDelineation = endOutputStream(null);
-                if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
-                    inconsistent = true;
-                }
+                return messageDelineation != MessageDelineation.MESSAGE_HEAD;
             }
 
             @Override
@@ -210,16 +194,46 @@ public class ServerHttp1StreamDuplexer e
     void terminate(final Exception exception) {
         if (incoming != null) {
             incoming.failed(exception);
+            incoming.releaseResources();
             incoming = null;
         }
         if (outgoing != null) {
             outgoing.failed(exception);
+            outgoing.releaseResources();
             outgoing = null;
         }
         for (;;) {
             final ServerHttp1StreamHandler handler = pipeline.poll();
             if (handler != null) {
                 handler.failed(exception);
+                handler.releaseResources();
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    void disconnected() {
+        if (incoming != null) {
+            if (!incoming.isCompleted()) {
+                incoming.failed(new ConnectionClosedException("Connection closed"));
+            }
+            incoming.releaseResources();
+            incoming = null;
+        }
+        if (outgoing != null) {
+            if (!outgoing.isCompleted()) {
+                outgoing.failed(new ConnectionClosedException("Connection closed"));
+            }
+            outgoing.releaseResources();
+            outgoing = null;
+        }
+        for (;;) {
+            final ServerHttp1StreamHandler handler = pipeline.poll();
+            if (handler != null) {
+                handler.failed(new ConnectionClosedException("Connection closed"));
+                handler.releaseResources();
             } else {
                 break;
             }
@@ -295,8 +309,7 @@ public class ServerHttp1StreamDuplexer e
                     httpProcessor,
                     connectionReuseStrategy,
                     exchangeHandlerFactory,
-                    context,
-                    contentBuffer);
+                    context);
             outgoing = streamHandler;
         } else {
             streamHandler = new ServerHttp1StreamHandler(
@@ -304,8 +317,7 @@ public class ServerHttp1StreamDuplexer e
                     httpProcessor,
                     connectionReuseStrategy,
                     exchangeHandlerFactory,
-                    context,
-                    contentBuffer);
+                    context);
             pipeline.add(streamHandler);
         }
         streamHandler.consumeHeader(request, endStream);
@@ -313,9 +325,21 @@ public class ServerHttp1StreamDuplexer e
     }
 
     @Override
-    int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+    int consumeData(final ByteBuffer src) throws HttpException, IOException {
+        Asserts.notNull(incoming, "Request stream handler");
+        return incoming.consumeData(src);
+    }
+
+    @Override
+    void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
         Asserts.notNull(incoming, "Request stream handler");
-        return incoming.consumeData(contentDecoder);
+        incoming.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        Asserts.notNull(incoming, "Request stream handler");
+        incoming.dataEnd(trailers);
     }
 
     @Override
@@ -346,24 +370,16 @@ public class ServerHttp1StreamDuplexer e
 
     @Override
     void outputEnd() throws HttpException, IOException {
-        if (outgoing != null && outgoing.isResponseCompleted()) {
-            final boolean keepAlive = !inconsistent && outgoing.keepAlive();
+        if (outgoing != null && outgoing.isResponseFinal()) {
+            if (streamListener != null) {
+                streamListener.onExchangeComplete(this, isOpen());
+            }
             if (outgoing.isCompleted()) {
                 outgoing.releaseResources();
             }
             outgoing = null;
-            if (streamListener != null) {
-                streamListener.onExchangeComplete(this, keepAlive);
-            }
-            if (!keepAlive) {
-                if (incoming == null && pipeline.isEmpty()) {
-                    requestShutdown(ShutdownType.IMMEDIATE);
-                } else {
-                    doTerminate(new ConnectionClosedException("Connection cannot be kept alive"));
-                }
-            }
         }
-        if (outgoing == null) {
+        if (outgoing == null && isOpen()) {
             final ServerHttp1StreamHandler handler = pipeline.poll();
             if (handler != null) {
                 outgoing = handler;
@@ -393,6 +409,11 @@ public class ServerHttp1StreamDuplexer e
         }
 
         @Override
+        public void close() {
+            channel.close();
+        }
+
+        @Override
         public void submit(final HttpResponse response, final boolean endStream) throws HttpException, IOException {
             synchronized (this) {
                 if (direct) {
@@ -405,23 +426,6 @@ public class ServerHttp1StreamDuplexer e
         }
 
         @Override
-        public void update(final int increment) throws IOException {
-            if (increment > 0) {
-                channel.requestInput();
-            }
-        }
-
-        @Override
-        public void suspendInput() {
-            channel.suspendInput();
-        }
-
-        @Override
-        public void requestInput() {
-            channel.requestInput();
-        }
-
-        @Override
         public void suspendOutput() {
             channel.suspendOutput();
         }
@@ -464,12 +468,13 @@ public class ServerHttp1StreamDuplexer e
         }
 
         @Override
-        public void abortOutput() throws IOException {
+        public boolean abortGracefully() throws IOException {
             synchronized (this) {
                 if (direct) {
-                    channel.abortOutput();
+                    return channel.abortGracefully();
                 } else {
                     completed = true;
+                    return true;
                 }
             }
         }