You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/11/23 18:31:45 UTC
[1/3] httpcomponents-core git commit: Bugfix: corrected handling of
GOAWAY frames by HTTP/2 stream multiplexer
Repository: httpcomponents-core
Updated Branches:
refs/heads/master 1c3a57129 -> 635632314
Bugfix: corrected handling of GOAWAY frames by HTTP/2 stream multiplexer
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/d36209cf
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/d36209cf
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/d36209cf
Branch: refs/heads/master
Commit: d36209cf4a8fd50f3a4eb6d7b4ed62abde702cf8
Parents: 1c3a571
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Wed Nov 22 16:55:07 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:05 2017 +0100
----------------------------------------------------------------------
.../nio/AbstractHttp2StreamMultiplexer.java | 42 +++++++++++---------
1 file changed, 24 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/d36209cf/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 32525ba..e2690b9 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -579,9 +579,9 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
connState = ConnectionHandshake.SHUTDOWN;
} else {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
- connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
commitFrame(goAway);
+ connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
}
}
break;
@@ -636,12 +636,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
break;
}
}
- for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
- final Map.Entry<Integer, Http2Stream> entry = it.next();
- final Http2Stream stream = entry.getValue();
- stream.reset(cause);
- }
- streamMap.clear();
for (;;) {
final Command command = ioSession.getCommand();
if (command != null) {
@@ -657,17 +651,28 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
break;
}
}
+ for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
+ final Map.Entry<Integer, Http2Stream> entry = it.next();
+ final Http2Stream stream = entry.getValue();
+ if (stream.isLocalClosed() && (stream.isRemoteClosed() || stream.isLocalReset())) {
+ stream.reset(cause);
+ }
+ stream.releaseResources();
+ }
+ streamMap.clear();
if (!(cause instanceof ConnectionClosedException)) {
- final H2Error errorCode;
- if (cause instanceof H2ConnectionException) {
- errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
- } else if (cause instanceof ProtocolException){
- errorCode = H2Error.PROTOCOL_ERROR;
- } else {
- errorCode = H2Error.INTERNAL_ERROR;
+ if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
+ final H2Error errorCode;
+ if (cause instanceof H2ConnectionException) {
+ errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
+ } else if (cause instanceof ProtocolException){
+ errorCode = H2Error.PROTOCOL_ERROR;
+ } else {
+ errorCode = H2Error.INTERNAL_ERROR;
+ }
+ final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
+ commitFrame(goAway);
}
- final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
- commitFrame(goAway);
}
connState = ConnectionHandshake.SHUTDOWN;
} catch (final IOException ignore) {
@@ -938,7 +943,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
final int errorCode = payload.getInt();
if (errorCode == H2Error.NO_ERROR.getCode()) {
if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
- connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<Integer, Http2Stream> entry = it.next();
final int activeStreamId = entry.getKey();
@@ -949,16 +953,18 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
}
}
}
+ connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
} else {
- connState = ConnectionHandshake.SHUTDOWN;
for (final Iterator<Map.Entry<Integer, Http2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<Integer, Http2Stream> entry = it.next();
final Http2Stream stream = entry.getValue();
stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer"));
}
streamMap.clear();
+ connState = ConnectionHandshake.SHUTDOWN;
}
}
+ ioSession.setEvent(SelectionKey.OP_WRITE);
break;
}
}
[3/3] httpcomponents-core git commit: HTTP/2 multiplexed requester to
support cancellation of individual message exchanges without termination of
the underlying I/O session
Posted by ol...@apache.org.
HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/63563231
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/63563231
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/63563231
Branch: refs/heads/master
Commit: 6356323147ca4a62e1ad89f4fc47c0d5b4ad5770
Parents: 0167512
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 14:56:44 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:44 2017 +0100
----------------------------------------------------------------------
.../nio/bootstrap/CancellableExecution.java | 74 ++++++++++++++++++++
.../bootstrap/Http2MultiplexingRequester.java | 25 +++++--
...Http2ServerAndMultiplexingRequesterTest.java | 37 ++++++++++
3 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
new file mode 100644
index 0000000..e96a036
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.impl.nio.bootstrap;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+
+final class CancellableExecution implements CancellableDependency, Cancellable {
+
+ private final AtomicBoolean cancelled;
+ private final AtomicReference<Cancellable> dependencyRef;
+
+ CancellableExecution() {
+ this.cancelled = new AtomicBoolean(false);
+ this.dependencyRef = new AtomicReference<>(null);
+ }
+
+ @Override
+ public void setDependency(final Cancellable cancellable) {
+ dependencyRef.set(cancellable);
+ if (cancelled.get()) {
+ final Cancellable dependency = dependencyRef.getAndSet(null);
+ if (dependency != null) {
+ dependency.cancel();
+ }
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ final Cancellable dependency = dependencyRef.getAndSet(null);
+ if (dependency != null) {
+ dependency.cancel();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 85156e7..8fab172 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -34,7 +34,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
-import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
@@ -112,13 +114,26 @@ public class Http2MultiplexingRequester extends AsyncRequester{
connPool.setValidateAfterInactivity(timeValue);
}
- public void execute(
+ public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
final Timeout timeout,
final HttpContext context) {
Args.notNull(exchangeHandler, "Exchange handler");
Args.notNull(timeout, "Timeout");
Args.notNull(context, "Context");
+ final CancellableExecution cancellableExecution = new CancellableExecution();
+ execute(exchangeHandler, cancellableExecution, timeout, context);
+ return cancellableExecution;
+ }
+
+ private void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final CancellableDependency cancellableDependency,
+ final Timeout timeout,
+ final HttpContext context) {
+ Args.notNull(exchangeHandler, "Exchange handler");
+ Args.notNull(timeout, "Timeout");
+ Args.notNull(context, "Context");
try {
exchangeHandler.produceRequest(new RequestChannel() {
@@ -194,7 +209,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
exchangeHandler.failed(cause);
}
- }, context));
+ }, cancellableDependency, context));
}
@Override
@@ -226,7 +241,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
Args.notNull(timeout, "Timeout");
- final BasicFuture<T> future = new BasicFuture<>(callback);
+ final ComplexFuture<T> future = new ComplexFuture<>(callback);
final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
@Override
@@ -245,7 +260,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
}
});
- execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+ execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
return future;
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
index d5d46c4..791fd13 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
@@ -34,8 +34,11 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
+import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
@@ -50,6 +53,8 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.http.nio.ssl.SecurePortStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequester;
import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequesterBootstrap;
@@ -334,4 +339,36 @@ public class Http2ServerAndMultiplexingRequesterTest {
Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
}
+ @Test
+ public void testMultiplexedRequestCancellation() throws Exception {
+ server.start();
+ final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
+ final ListenerEndpoint listener = future.get();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final int reqNo = 20;
+
+ final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
+ final Random random = new Random();
+ final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id);
+ for (int i = 0; i < reqNo; i++) {
+ final Cancellable cancellable = requester.execute(
+ new BasicClientExchangeHandler<>(new BasicRequestProducer("POST", target, "/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer() {
+
+ @Override
+ public void releaseResources() {
+ super.releaseResources();
+ countDownLatch.countDown();
+ }
+ }), null), TIMEOUT, HttpCoreContext.create());
+ Thread.sleep(random.nextInt(10));
+ cancellable.cancel();
+ }
+ Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
+ Thread.sleep(1500);
+ }
+
}
[2/3] httpcomponents-core git commit: * Revised stream reset logic *
Make HTTP/2 client stream cancellable by the caller
Posted by ol...@apache.org.
* Revised stream reset logic
* Make HTTP/2 client stream cancellable by the caller
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/01675126
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/01675126
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/01675126
Branch: refs/heads/master
Commit: 0167512689b890a1ab28185ea64a99c1f052b628
Parents: d36209c
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 13:53:26 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:26 2017 +0100
----------------------------------------------------------------------
.../nio/AbstractHttp2StreamMultiplexer.java | 114 ++++++++++++-------
.../impl/nio/ClientHttp2StreamHandler.java | 17 ++-
.../impl/nio/ClientPushHttp2StreamHandler.java | 19 ++--
.../http2/impl/nio/Http2StreamChannel.java | 3 +-
.../http2/impl/nio/Http2StreamHandler.java | 2 -
.../impl/nio/ServerHttp2StreamHandler.java | 13 +--
.../impl/nio/ServerPushHttp2StreamHandler.java | 11 +-
.../http/nio/command/ExecutionCommand.java | 22 +++-
8 files changed, 119 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index e2690b9..c83274c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -45,6 +46,8 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLSession;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.Header;
@@ -597,10 +600,11 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
localConfig.getInitialWindowSize(),
remoteConfig.getInitialWindowSize());
final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
+ final CancellableDependency cancellableDependency = executionCommand.getCancellableDependency();
final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
- final Http2StreamHandler streamHandler = new ClientHttp2StreamHandler(
+ final ClientHttp2StreamHandler streamHandler = new ClientHttp2StreamHandler(
channel,
httpProcessor,
connMetrics,
@@ -612,7 +616,16 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
if (stream.isOutputReady()) {
stream.produceOutput();
}
+ if (cancellableDependency != null) {
+ cancellableDependency.setDependency(new Cancellable() {
+ @Override
+ public boolean cancel() {
+ return stream.abort();
+ }
+
+ });
+ }
if (!outputQueue.isEmpty()) {
return;
}
@@ -988,12 +1001,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
}
}
}
- if (stream.isResetLocally()) {
- return;
- }
if (stream.isRemoteClosed()) {
throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
}
+ if (stream.isLocalReset()) {
+ return;
+ }
if (frame.isFlagSet(FrameFlag.END_STREAM)) {
stream.setRemoteEndStream();
}
@@ -1041,14 +1054,14 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
if (streamListener != null) {
streamListener.onHeaderInput(this, streamId, headers);
}
- if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
- throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
+ if (stream.isRemoteClosed()) {
+ throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
}
- if (stream.isResetLocally()) {
+ if (stream.isLocalReset()) {
return;
}
- if (stream.isRemoteClosed()) {
- throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
+ if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
+ throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
}
if (frame.isFlagSet(FrameFlag.END_STREAM)) {
stream.setRemoteEndStream();
@@ -1074,12 +1087,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
if (connState == ConnectionHandshake.GRACEFUL_SHUTDOWN) {
throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, "Stream refused");
}
- if (stream.isResetLocally()) {
- return;
- }
if (stream.isRemoteClosed()) {
throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
}
+ if (stream.isLocalReset()) {
+ return;
+ }
if (continuation.endStream) {
stream.setRemoteEndStream();
}
@@ -1397,43 +1410,53 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
remoteEndStream = true;
}
- void setLocalEndStream() {
- localEndStream = true;
- }
-
boolean isLocalClosed() {
return localEndStream;
}
- boolean isClosed() {
- return remoteEndStream && localEndStream;
+ void setLocalEndStream() {
+ localEndStream = true;
}
- void close() {
- localEndStream = true;
- remoteEndStream = true;
+ boolean isLocalReset() {
+ return deadline > 0;
}
- void localReset(final int code) throws IOException {
- deadline = System.currentTimeMillis() + LINGER_TIME;
- close();
- if (!idle) {
- outputLock.lock();
- try {
+ boolean isResetDeadline() {
+ final long l = deadline;
+ return l > 0 && l < System.currentTimeMillis();
+ }
+
+ boolean localReset(final int code) throws IOException {
+ outputLock.lock();
+ try {
+ if (localEndStream) {
+ return false;
+ }
+ localEndStream = true;
+ deadline = System.currentTimeMillis() + LINGER_TIME;
+ if (!idle) {
final RawFrame resetStream = frameFactory.createResetStream(id, code);
commitFrameInternal(resetStream);
- } finally {
- outputLock.unlock();
+ return true;
}
+ return false;
+ } finally {
+ outputLock.unlock();
}
}
- void localReset(final H2Error error) throws IOException {
- localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
+ boolean localReset(final H2Error error) throws IOException {
+ return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
}
- long getDeadline() {
- return deadline;
+ @Override
+ public boolean cancel() {
+ try {
+ return localReset(H2Error.CANCEL);
+ } catch (final IOException ignore) {
+ return false;
+ }
}
@Override
@@ -1455,8 +1478,6 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
private final Http2StreamHandler handler;
private final boolean remoteInitiated;
- private volatile boolean resetLocally;
-
private Http2Stream(
final Http2StreamChannelImpl channel,
final Http2StreamHandler handler,
@@ -1470,7 +1491,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
return channel.getId();
}
- public boolean isRemoteInitiated() {
+ boolean isRemoteInitiated() {
return remoteInitiated;
}
@@ -1483,7 +1504,7 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
}
boolean isTerminated() {
- return channel.isClosed() && channel.getDeadline() < System.currentTimeMillis();
+ return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
}
boolean isRemoteClosed() {
@@ -1494,6 +1515,10 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
return channel.isLocalClosed();
}
+ boolean isLocalReset() {
+ return channel.isLocalReset();
+ }
+
void setRemoteEndStream() {
channel.setRemoteEndStream();
}
@@ -1542,12 +1567,12 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
}
void reset(final Exception cause) {
- channel.close();
+ channel.setRemoteEndStream();
+ channel.setLocalEndStream();
handler.failed(cause);
}
void localReset(final Exception cause, final int code) throws IOException {
- resetLocally = true;
channel.localReset(code);
handler.failed(cause);
}
@@ -1560,13 +1585,14 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
localReset(ex, ex.getCode());
}
- public boolean isResetLocally() {
- return resetLocally;
+ void cancel() {
+ reset(new CancellationException("HTTP/2 message exchange cancelled"));
}
- void cancel() {
- channel.close();
- handler.cancel();
+ boolean abort() {
+ final boolean cancelled = channel.cancel();
+ handler.failed(new CancellationException("HTTP/2 message exchange cancelled"));
+ return cancelled;
}
void releaseResources() {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
index b3467c0..49a705c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamHandler.java
@@ -62,6 +62,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
private final AsyncClientExchangeHandler exchangeHandler;
private final HttpCoreContext context;
private final AtomicBoolean requestCommitted;
+ private final AtomicBoolean failed;
private final AtomicBoolean done;
private volatile MessageState requestState;
@@ -104,6 +105,7 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
this.exchangeHandler = exchangeHandler;
this.context = context;
this.requestCommitted = new AtomicBoolean(false);
+ this.failed = new AtomicBoolean(false);
this.done = new AtomicBoolean(false);
this.requestState = MessageState.HEADERS;
this.responseState = MessageState.HEADERS;
@@ -238,16 +240,11 @@ class ClientHttp2StreamHandler implements Http2StreamHandler {
@Override
public void failed(final Exception cause) {
try {
- exchangeHandler.failed(cause);
- } finally {
- releaseResources();
- }
- }
-
- @Override
- public void cancel() {
- try {
- exchangeHandler.cancel();
+ if (failed.compareAndSet(false, true)) {
+ if (exchangeHandler != null) {
+ exchangeHandler.failed(cause);
+ }
+ }
} finally {
releaseResources();
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
index 6b52010..cdbd344 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java
@@ -59,6 +59,7 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
private final BasicHttpConnectionMetrics connMetrics;
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final HttpCoreContext context;
+ private final AtomicBoolean failed;
private final AtomicBoolean done;
private volatile HttpRequest request;
@@ -77,6 +78,7 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
this.connMetrics = connMetrics;
this.pushHandlerFactory = pushHandlerFactory;
this.context = context;
+ this.failed = new AtomicBoolean(false);
this.done = new AtomicBoolean(false);
this.requestState = MessageState.HEADERS;
this.responseState = MessageState.HEADERS;
@@ -171,16 +173,15 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler {
@Override
public void failed(final Exception cause) {
- final AsyncPushConsumer localHandler = exchangeHandler;
- if (localHandler != null) {
- localHandler.failed(cause);
+ try {
+ if (failed.compareAndSet(false, true)) {
+ if (exchangeHandler != null) {
+ exchangeHandler.failed(cause);
+ }
+ }
+ } finally {
+ releaseResources();
}
- releaseResources();
- }
-
- @Override
- public void cancel() {
- releaseResources();
}
@Override
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
index 2d58bda..b352dc9 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamChannel.java
@@ -30,13 +30,14 @@ package org.apache.hc.core5.http2.impl.nio;
import java.io.IOException;
import java.util.List;
+import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
-interface Http2StreamChannel extends DataStreamChannel, CapacityChannel {
+interface Http2StreamChannel extends DataStreamChannel, CapacityChannel, Cancellable {
void submit(List<Header> headers, boolean endStream) throws HttpException, IOException;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
index 32da53d..589728c 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/Http2StreamHandler.java
@@ -50,6 +50,4 @@ interface Http2StreamHandler extends ResourceHolder {
void failed(Exception cause);
- void cancel();
-
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
index 0aaf570..38feae6 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamHandler.java
@@ -68,6 +68,7 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
private final HttpCoreContext context;
private final AtomicBoolean responseCommitted;
+ private final AtomicBoolean failed;
private final AtomicBoolean done;
private volatile AsyncServerExchangeHandler exchangeHandler;
@@ -112,6 +113,7 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
this.exchangeHandlerFactory = exchangeHandlerFactory;
this.context = context;
this.responseCommitted = new AtomicBoolean(false);
+ this.failed = new AtomicBoolean(false);
this.done = new AtomicBoolean(false);
this.requestState = MessageState.HEADERS;
this.responseState = MessageState.IDLE;
@@ -285,8 +287,10 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
@Override
public void failed(final Exception cause) {
try {
- if (exchangeHandler != null) {
- exchangeHandler.failed(cause);
+ if (failed.compareAndSet(false, true)) {
+ if (exchangeHandler != null) {
+ exchangeHandler.failed(cause);
+ }
}
} finally {
releaseResources();
@@ -294,11 +298,6 @@ public class ServerHttp2StreamHandler implements Http2StreamHandler {
}
@Override
- public void cancel() {
- releaseResources();
- }
-
- @Override
public void releaseResources() {
if (done.compareAndSet(false, true)) {
requestState = MessageState.COMPLETE;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
index a8c3ffd..1e285d3 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerPushHttp2StreamHandler.java
@@ -60,6 +60,7 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
private final AsyncPushProducer pushProducer;
private final HttpCoreContext context;
private final AtomicBoolean responseCommitted;
+ private final AtomicBoolean failed;
private final AtomicBoolean done;
private volatile MessageState requestState;
@@ -102,6 +103,7 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
this.pushProducer = pushProducer;
this.context = context;
this.responseCommitted = new AtomicBoolean(false);
+ this.failed = new AtomicBoolean(false);
this.done = new AtomicBoolean(false);
this.requestState = MessageState.COMPLETE;
this.responseState = MessageState.IDLE;
@@ -220,18 +222,15 @@ class ServerPushHttp2StreamHandler implements Http2StreamHandler {
@Override
public void failed(final Exception cause) {
try {
- pushProducer.failed(cause);
+ if (failed.compareAndSet(false, true)) {
+ pushProducer.failed(cause);
+ }
} finally {
releaseResources();
}
}
@Override
- public void cancel() {
- releaseResources();
- }
-
- @Override
public void releaseResources() {
if (done.compareAndSet(false, true)) {
requestState = MessageState.COMPLETE;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/01675126/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
index 7328ea8..f4006ab 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java
@@ -27,6 +27,7 @@
package org.apache.hc.core5.http.nio.command;
+import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.Command;
@@ -40,21 +41,36 @@ import org.apache.hc.core5.util.Args;
public final class ExecutionCommand implements Command {
private final AsyncClientExchangeHandler exchangeHandler;
+ private final CancellableDependency cancellableDependency;
private final HttpContext context;
- public ExecutionCommand(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
+ public ExecutionCommand(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final CancellableDependency cancellableDependency,
+ final HttpContext context) {
this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
+ this.cancellableDependency = cancellableDependency;
this.context = context;
}
- public HttpContext getContext() {
- return context;
+ public ExecutionCommand(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final HttpContext context) {
+ this(exchangeHandler, null, context);
}
public AsyncClientExchangeHandler getExchangeHandler() {
return exchangeHandler;
}
+ public CancellableDependency getCancellableDependency() {
+ return cancellableDependency;
+ }
+
+ public HttpContext getContext() {
+ return context;
+ }
+
@Override
public boolean cancel() {
exchangeHandler.cancel();