You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/11/23 18:31:45 UTC

[1/3] httpcomponents-core git commit: Bugfix: corrected handling of GOAWAY frames by HTTP/2 stream multiplexer

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 1c3a57129 -> 635632314


Bugfix: corrected handling of GOAWAY frames by HTTP/2 stream multiplexer


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/d36209cf
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/d36209cf
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/d36209cf

Branch: refs/heads/master
Commit: d36209cf4a8fd50f3a4eb6d7b4ed62abde702cf8
Parents: 1c3a571
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Wed Nov 22 16:55:07 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:05 2017 +0100

----------------------------------------------------------------------
 .../nio/AbstractHttp2StreamMultiplexer.java     | 42 +++++++++++---------
 1 file changed, 24 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d36209cf/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 32525ba..e2690b9 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -579,9 +579,9 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     connState = ConnectionHandshake.SHUTDOWN;
                 } else {
                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
-                        connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
                         final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
                         commitFrame(goAway);
+                        connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
                     }
                 }
                 break;
@@ -636,12 +636,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     break;
                 }
             }
-            for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
-                final Map.Entry<Integer, Http2Stream> entry = it.next();
-                final Http2Stream stream = entry.getValue();
-                stream.reset(cause);
-            }
-            streamMap.clear();
             for (;;) {
                 final Command command = ioSession.getCommand();
                 if (command != null) {
@@ -657,17 +651,28 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     break;
                 }
             }
+            for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
+                final Map.Entry<Integer, Http2Stream> entry = it.next();
+                final Http2Stream stream = entry.getValue();
+                if (stream.isLocalClosed() && (stream.isRemoteClosed() || stream.isLocalReset())) {
+                    stream.reset(cause);
+                }
+                stream.releaseResources();
+            }
+            streamMap.clear();
             if (!(cause instanceof ConnectionClosedException)) {
-                final H2Error errorCode;
-                if (cause instanceof H2ConnectionException) {
-                    errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
-                } else if (cause instanceof ProtocolException){
-                    errorCode = H2Error.PROTOCOL_ERROR;
-                } else {
-                    errorCode = H2Error.INTERNAL_ERROR;
+                if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
+                    final H2Error errorCode;
+                    if (cause instanceof H2ConnectionException) {
+                        errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
+                    } else if (cause instanceof ProtocolException){
+                        errorCode = H2Error.PROTOCOL_ERROR;
+                    } else {
+                        errorCode = H2Error.INTERNAL_ERROR;
+                    }
+                    final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
+                    commitFrame(goAway);
                 }
-                final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
-                commitFrame(goAway);
             }
             connState = ConnectionHandshake.SHUTDOWN;
         } catch (final IOException ignore) {
@@ -938,7 +943,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                 final int errorCode = payload.getInt();
                 if (errorCode == H2Error.NO_ERROR.getCode()) {
                     if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
-                        connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
                         for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                             final Map.Entry<Integer, Http2Stream> entry = it.next();
                             final int activeStreamId = entry.getKey();
@@ -949,16 +953,18 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                             }
                         }
                     }
+                    connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
                 } else {
-                    connState = ConnectionHandshake.SHUTDOWN;
                     for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                         final Map.Entry<Integer, Http2Stream> entry = it.next();
                         final Http2Stream stream = entry.getValue();
                         stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer"));
                     }
                     streamMap.clear();
+                    connState = ConnectionHandshake.SHUTDOWN;
                 }
             }
+            ioSession.setEvent(SelectionKey.OP_WRITE);
             break;
         }
     }


[3/3] httpcomponents-core git commit: HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session

Posted by ol...@apache.org.
HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/63563231
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/63563231
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/63563231

Branch: refs/heads/master
Commit: 6356323147ca4a62e1ad89f4fc47c0d5b4ad5770
Parents: 0167512
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 14:56:44 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:44 2017 +0100

----------------------------------------------------------------------
 .../nio/bootstrap/CancellableExecution.java     | 74 ++++++++++++++++++++
 .../bootstrap/Http2MultiplexingRequester.java   | 25 +++++--
 ...Http2ServerAndMultiplexingRequesterTest.java | 37 ++++++++++
 3 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
new file mode 100644
index 0000000..e96a036
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.impl.nio.bootstrap;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+
+final class CancellableExecution implements CancellableDependency, Cancellable {
+
+    private final AtomicBoolean cancelled;
+    private final AtomicReference<Cancellable> dependencyRef;
+
+    CancellableExecution() {
+        this.cancelled = new AtomicBoolean(false);
+        this.dependencyRef = new AtomicReference<>(null);
+    }
+
+    @Override
+    public void setDependency(final Cancellable cancellable) {
+        dependencyRef.set(cancellable);
+        if (cancelled.get()) {
+            final Cancellable dependency = dependencyRef.getAndSet(null);
+            if (dependency != null) {
+                dependency.cancel();
+            }
+        }
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return cancelled.get();
+    }
+
+    @Override
+    public boolean cancel() {
+        if (cancelled.compareAndSet(false, true)) {
+            final Cancellable dependency = dependencyRef.getAndSet(null);
+            if (dependency != null) {
+                dependency.cancel();
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 85156e7..8fab172 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -34,7 +34,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 
-import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Decorator;
@@ -112,13 +114,26 @@ public class Http2MultiplexingRequester extends AsyncRequester{
         connPool.setValidateAfterInactivity(timeValue);
     }
 
-    public void execute(
+    public Cancellable execute(
             final AsyncClientExchangeHandler exchangeHandler,
             final Timeout timeout,
             final HttpContext context) {
         Args.notNull(exchangeHandler, "Exchange handler");
         Args.notNull(timeout, "Timeout");
         Args.notNull(context, "Context");
+        final CancellableExecution cancellableExecution = new CancellableExecution();
+        execute(exchangeHandler, cancellableExecution, timeout, context);
+        return cancellableExecution;
+    }
+
+    private void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final CancellableDependency cancellableDependency,
+            final Timeout timeout,
+            final HttpContext context) {
+        Args.notNull(exchangeHandler, "Exchange handler");
+        Args.notNull(timeout, "Timeout");
+        Args.notNull(context, "Context");
         try {
             exchangeHandler.produceRequest(new RequestChannel() {
 
@@ -194,7 +209,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
                                     exchangeHandler.failed(cause);
                                 }
 
-                            }, context));
+                            }, cancellableDependency, context));
                         }
 
                         @Override
@@ -226,7 +241,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
         Args.notNull(requestProducer, "Request producer");
         Args.notNull(responseConsumer, "Response consumer");
         Args.notNull(timeout, "Timeout");
-        final BasicFuture<T> future = new BasicFuture<>(callback);
+        final ComplexFuture<T> future = new ComplexFuture<>(callback);
         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
 
             @Override
@@ -245,7 +260,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
             }
 
         });
-        execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+        execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
         return future;
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
index d5d46c4..791fd13 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
@@ -34,8 +34,11 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.HttpHost;
@@ -50,6 +53,8 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
 import org.apache.hc.core5.http.nio.ssl.SecurePortStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequester;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequesterBootstrap;
@@ -334,4 +339,36 @@ public class Http2ServerAndMultiplexingRequesterTest {
         Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
     }
 
+    @Test
+    public void testMultiplexedRequestCancellation() throws Exception {
+        server.start();
+        final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
+        final ListenerEndpoint listener = future.get();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final int reqNo = 20;
+
+        final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
+        final Random random = new Random();
+        final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id);
+        for (int i = 0; i < reqNo; i++) {
+            final Cancellable cancellable = requester.execute(
+                    new BasicClientExchangeHandler<>(new BasicRequestProducer("POST", target, "/stuff",
+                            new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                            new BasicResponseConsumer<>(new StringAsyncEntityConsumer() {
+
+                                @Override
+                                public void releaseResources() {
+                                    super.releaseResources();
+                                    countDownLatch.countDown();
+                                }
+                            }), null), TIMEOUT, HttpCoreContext.create());
+            Thread.sleep(random.nextInt(10));
+            cancellable.cancel();
+        }
+        Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
+        Thread.sleep(1500);
+    }
+
 }


[2/3] httpcomponents-core git commit: * Revised stream reset logic * Make HTTP/2 client stream cancellable by the caller

Posted by ol...@apache.org.
* Revised stream reset logic
* Make HTTP/2 client stream cancellable by the caller


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/01675126
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/01675126
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/01675126

Branch: refs/heads/master
Commit: 0167512689b890a1ab28185ea64a99c1f052b628
Parents: d36209c
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 13:53:26 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:26 2017 +0100

----------------------------------------------------------------------
 .../nio/AbstractHttp2StreamMultiplexer.java     | 114 ++++++++++++-------
 .../impl/nio/ClientHttp2StreamHandler.java      |  17 ++-
 .../impl/nio/ClientPushHttp2StreamHandler.java  |  19 ++--
 .../http2/impl/nio/Http2StreamChannel.java      |   3 +-
 .../http2/impl/nio/Http2StreamHandler.java      |   2 -
 .../impl/nio/ServerHttp2StreamHandler.java      |  13 +--
 .../impl/nio/ServerPushHttp2StreamHandler.java  |  11 +-
 .../http/nio/command/ExecutionCommand.java      |  22 +++-
 8 files changed, 119 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index e2690b9..c83274c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -36,6 +36,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -45,6 +46,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLSession;
 
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
 import org.apache.hc.core5.http.ConnectionClosedException;
 import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.Header;
@@ -597,10 +600,11 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                         localConfig.getInitialWindowSize(),
                         remoteConfig.getInitialWindowSize());
                 final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+                final CancellableDependency cancellableDependency = executionCommand.getCancellableDependency();
                 final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
                 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
                 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
-                final Http2StreamHandler streamHandler = new ClientHttp2StreamHandler(
+                final ClientHttp2StreamHandler streamHandler = new ClientHttp2StreamHandler(
                         channel,
                         httpProcessor,
                         connMetrics,
@@ -612,7 +616,16 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                 if (stream.isOutputReady()) {
                     stream.produceOutput();
                 }
+                if (cancellableDependency != null) {
+                    cancellableDependency.setDependency(new Cancellable() {
 
+                        @Override
+                        public boolean cancel() {
+                            return stream.abort();
+                        }
+
+                    });
+                }
                 if (!outputQueue.isEmpty()) {
                     return;
                 }
@@ -988,12 +1001,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                 }
             }
         }
-        if (stream.isResetLocally()) {
-            return;
-        }
         if (stream.isRemoteClosed()) {
             throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
         }
+        if (stream.isLocalReset()) {
+            return;
+        }
         if (frame.isFlagSet(FrameFlag.END_STREAM)) {
             stream.setRemoteEndStream();
         }
@@ -1041,14 +1054,14 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             if (streamListener != null) {
                 streamListener.onHeaderInput(this, streamId, headers);
             }
-            if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
-                throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
+            if (stream.isRemoteClosed()) {
+                throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
             }
-            if (stream.isResetLocally()) {
+            if (stream.isLocalReset()) {
                 return;
             }
-            if (stream.isRemoteClosed()) {
-                throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
+            if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
+                throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
             }
             if (frame.isFlagSet(FrameFlag.END_STREAM)) {
                 stream.setRemoteEndStream();
@@ -1074,12 +1087,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
                 throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
             }
-            if (stream.isResetLocally()) {
-                return;
-            }
             if (stream.isRemoteClosed()) {
                 throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
             }
+            if (stream.isLocalReset()) {
+                return;
+            }
             if (continuation.endStream) {
                 stream.setRemoteEndStream();
             }
@@ -1397,43 +1410,53 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             remoteEndStream = true;
         }
 
-        void setLocalEndStream() {
-            localEndStream = true;
-        }
-
         boolean isLocalClosed() {
             return localEndStream;
         }
 
-        boolean isClosed() {
-            return remoteEndStream && localEndStream;
+        void setLocalEndStream() {
+            localEndStream = true;
         }
 
-        void close() {
-            localEndStream = true;
-            remoteEndStream = true;
+        boolean isLocalReset() {
+            return deadline > 0;
         }
 
-        void localReset(final int code) throws IOException {
-            deadline = System.currentTimeMillis() + LINGER_TIME;
-            close();
-            if (!idle) {
-                outputLock.lock();
-                try {
+        boolean isResetDeadline() {
+            final long l = deadline;
+            return l > 0 && l < System.currentTimeMillis();
+        }
+
+        boolean localReset(final int code) throws IOException {
+            outputLock.lock();
+            try {
+                if (localEndStream) {
+                    return false;
+                }
+                localEndStream = true;
+                deadline = System.currentTimeMillis() + LINGER_TIME;
+                if (!idle) {
                     final RawFrame resetStream = frameFactory.createResetStream(id, code);
                     commitFrameInternal(resetStream);
-                } finally {
-                    outputLock.unlock();
+                    return true;
                 }
+                return false;
+            } finally {
+                outputLock.unlock();
             }
         }
 
-        void localReset(final H2Error error) throws IOException {
-            localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
+        boolean localReset(final H2Error error) throws IOException {
+            return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
         }
 
-        long getDeadline() {
-            return deadline;
+        @Override
+        public boolean cancel() {
+            try {
+                return localReset(H2Error.CANCEL);
+            } catch (final IOException ignore) {
+                return false;
+            }
         }
 
         @Override
@@ -1455,8 +1478,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
         private final Http2StreamHandler handler;
         private final boolean remoteInitiated;
 
-        private volatile boolean resetLocally;
-
         private Http2Stream(
                 final Http2StreamChannelImpl channel,
                 final Http2StreamHandler handler,
@@ -1470,7 +1491,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             return channel.getId();
         }
 
-        public boolean isRemoteInitiated() {
+        boolean isRemoteInitiated() {
             return remoteInitiated;
         }
 
@@ -1483,7 +1504,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
         }
 
         boolean isTerminated() {
-            return channel.isClosed() && channel.getDeadline() < System.currentTimeMillis();
+            return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
         }
 
         boolean isRemoteClosed() {
@@ -1494,6 +1515,10 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             return channel.isLocalClosed();
         }
 
+        boolean isLocalReset() {
+            return channel.isLocalReset();
+        }
+
         void setRemoteEndStream() {
             channel.setRemoteEndStream();
         }
@@ -1542,12 +1567,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
         }
 
         void reset(final Exception cause) {
-            channel.close();
+            channel.setRemoteEndStream();
+            channel.setLocalEndStream();
             handler.failed(cause);
         }
 
         void localReset(final Exception cause, final int code) throws IOException {
-            resetLocally = true;
             channel.localReset(code);
             handler.failed(cause);
         }
@@ -1560,13 +1585,14 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
             localReset(ex, ex.getCode());
         }
 
-        public boolean isResetLocally() {
-            return resetLocally;
+        void cancel() {
+            reset(new CancellationException("HTTP/2 message exchange cancelled"));
         }
 
-        void cancel() {
-            channel.close();
-            handler.cancel();
+        boolean abort() {
+            final boolean cancelled = channel.cancel();
+            handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
+            return cancelled;
         }
 
         void releaseResources() {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
index b3467c0..49a705c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
@@ -62,6 +62,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
     private final AsyncClientExchangeHandler exchangeHandler;
     private final HttpCoreContext context;
     private final AtomicBoolean requestCommitted;
+    private final AtomicBoolean failed;
     private final AtomicBoolean done;
 
     private volatile MessageState requestState;
@@ -104,6 +105,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
         this.exchangeHandler = exchangeHandler;
         this.context = context;
         this.requestCommitted = new AtomicBoolean(false);
+        this.failed = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
         this.requestState = MessageState.HEADERS;
         this.responseState = MessageState.HEADERS;
@@ -238,16 +240,11 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
     @Override
     public void failed(final Exception cause) {
         try {
-            exchangeHandler.failed(cause);
-        } finally {
-            releaseResources();
-        }
-    }
-
-    @Override
-    public void cancel() {
-        try {
-            exchangeHandler.cancel();
+            if (failed.compareAndSet(false, true)) {
+                if (exchangeHandler != null) {
+                    exchangeHandler.failed(cause);
+                }
+            }
         } finally {
             releaseResources();
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
index 6b52010..cdbd344 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
@@ -59,6 +59,7 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
     private final BasicHttpConnectionMetrics connMetrics;
     private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
     private final HttpCoreContext context;
+    private final AtomicBoolean failed;
     private final AtomicBoolean done;
 
     private volatile HttpRequest request;
@@ -77,6 +78,7 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
         this.connMetrics = connMetrics;
         this.pushHandlerFactory = pushHandlerFactory;
         this.context = context;
+        this.failed = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
         this.requestState = MessageState.HEADERS;
         this.responseState = MessageState.HEADERS;
@@ -171,16 +173,15 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
 
     @Override
     public void failed(final Exception cause) {
-        final AsyncPushConsumer localHandler = exchangeHandler;
-        if (localHandler != null) {
-            localHandler.failed(cause);
+        try {
+            if (failed.compareAndSet(false, true)) {
+                if (exchangeHandler != null) {
+                    exchangeHandler.failed(cause);
+                }
+            }
+        } finally {
+            releaseResources();
         }
-        releaseResources();
-    }
-
-    @Override
-    public void cancel() {
-        releaseResources();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
index 2d58bda..b352dc9 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
@@ -30,13 +30,14 @@ package org.apache.hc.core5.http2.impl.nio;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
 import org.apache.hc.core5.http.nio.AsyncPushProducer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
 
-interface Http2StreamChannel extends DataStreamChannel, CapacityChannel {
+interface Http2StreamChannel extends DataStreamChannel, CapacityChannel, Cancellable {
 
     void submit(List<Header> headers, boolean endStream) throws HttpException, IOException;
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
index 32da53d..589728c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
@@ -50,6 +50,4 @@ interface Http2StreamHandler extends ResourceHolder {
 
     void failed(Exception cause);
 
-    void cancel();
-
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
index 0aaf570..38feae6 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
@@ -68,6 +68,7 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
     private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final HttpCoreContext context;
     private final AtomicBoolean responseCommitted;
+    private final AtomicBoolean failed;
     private final AtomicBoolean done;
 
     private volatile AsyncServerExchangeHandler exchangeHandler;
@@ -112,6 +113,7 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
         this.exchangeHandlerFactory = exchangeHandlerFactory;
         this.context = context;
         this.responseCommitted = new AtomicBoolean(false);
+        this.failed = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
         this.requestState = MessageState.HEADERS;
         this.responseState = MessageState.IDLE;
@@ -285,8 +287,10 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
     @Override
     public void failed(final Exception cause) {
         try {
-            if (exchangeHandler != null) {
-                exchangeHandler.failed(cause);
+            if (failed.compareAndSet(false, true)) {
+                if (exchangeHandler != null) {
+                    exchangeHandler.failed(cause);
+                }
             }
         } finally {
             releaseResources();
@@ -294,11 +298,6 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
     }
 
     @Override
-    public void cancel() {
-        releaseResources();
-    }
-
-    @Override
     public void releaseResources() {
         if (done.compareAndSet(false, true)) {
             requestState = MessageState.COMPLETE;

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
index a8c3ffd..1e285d3 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
@@ -60,6 +60,7 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
     private final AsyncPushProducer pushProducer;
     private final HttpCoreContext context;
     private final AtomicBoolean responseCommitted;
+    private final AtomicBoolean failed;
     private final AtomicBoolean done;
 
     private volatile MessageState requestState;
@@ -102,6 +103,7 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
         this.pushProducer = pushProducer;
         this.context = context;
         this.responseCommitted = new AtomicBoolean(false);
+        this.failed = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
         this.requestState = MessageState.COMPLETE;
         this.responseState = MessageState.IDLE;
@@ -220,18 +222,15 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
     @Override
     public void failed(final Exception cause) {
         try {
-            pushProducer.failed(cause);
+            if (failed.compareAndSet(false, true)) {
+                pushProducer.failed(cause);
+            }
         } finally {
             releaseResources();
         }
     }
 
     @Override
-    public void cancel() {
-        releaseResources();
-    }
-
-    @Override
     public void releaseResources() {
         if (done.compareAndSet(false, true)) {
             requestState = MessageState.COMPLETE;

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
index 7328ea8..f4006ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
@@ -27,6 +27,7 @@
 
 package org.apache.hc.core5.http.nio.command;
 
+import org.apache.hc.core5.concurrent.CancellableDependency;
 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.reactor.Command;
@@ -40,21 +41,36 @@ import org.apache.hc.core5.util.Args;
 public final class ExecutionCommand implements Command {
 
     private final AsyncClientExchangeHandler exchangeHandler;
+    private final CancellableDependency cancellableDependency;
     private final HttpContext context;
 
-    public ExecutionCommand(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+    public ExecutionCommand(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final CancellableDependency cancellableDependency,
+            final HttpContext context) {
         this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
+        this.cancellableDependency = cancellableDependency;
         this.context = context;
     }
 
-    public HttpContext getContext() {
-        return context;
+    public ExecutionCommand(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final HttpContext context) {
+        this(exchangeHandler, null, context);
     }
 
     public AsyncClientExchangeHandler getExchangeHandler() {
         return exchangeHandler;
     }
 
+    public CancellableDependency getCancellableDependency() {
+        return cancellableDependency;
+    }
+
+    public HttpContext getContext() {
+        return context;
+    }
+
     @Override
     public boolean cancel() {
         exchangeHandler.cancel();