You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/11/23 18:31:47 UTC
[3/3] httpcomponents-core git commit: HTTP/2 multiplexed requester to
support cancellation of individual message exchanges without termination of
the underlying I/O session
HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/63563231
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/63563231
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/63563231
Branch: refs/heads/master
Commit: 6356323147ca4a62e1ad89f4fc47c0d5b4ad5770
Parents: 0167512
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 14:56:44 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:44 2017 +0100
----------------------------------------------------------------------
.../nio/bootstrap/CancellableExecution.java | 74 ++++++++++++++++++++
.../bootstrap/Http2MultiplexingRequester.java | 25 +++++--
...Http2ServerAndMultiplexingRequesterTest.java | 37 ++++++++++
3 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
new file mode 100644
index 0000000..e96a036
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.impl.nio.bootstrap;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+
+final class CancellableExecution implements CancellableDependency, Cancellable {
+
+ private final AtomicBoolean cancelled;
+ private final AtomicReference<Cancellable> dependencyRef;
+
+ CancellableExecution() {
+ this.cancelled = new AtomicBoolean(false);
+ this.dependencyRef = new AtomicReference<>(null);
+ }
+
+ @Override
+ public void setDependency(final Cancellable cancellable) {
+ dependencyRef.set(cancellable);
+ if (cancelled.get()) {
+ final Cancellable dependency = dependencyRef.getAndSet(null);
+ if (dependency != null) {
+ dependency.cancel();
+ }
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled.get();
+ }
+
+ @Override
+ public boolean cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ final Cancellable dependency = dependencyRef.getAndSet(null);
+ if (dependency != null) {
+ dependency.cancel();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 85156e7..8fab172 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -34,7 +34,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
-import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
@@ -112,13 +114,26 @@ public class Http2MultiplexingRequester extends AsyncRequester{
connPool.setValidateAfterInactivity(timeValue);
}
- public void execute(
+ public Cancellable execute(
final AsyncClientExchangeHandler exchangeHandler,
final Timeout timeout,
final HttpContext context) {
Args.notNull(exchangeHandler, "Exchange handler");
Args.notNull(timeout, "Timeout");
Args.notNull(context, "Context");
+ final CancellableExecution cancellableExecution = new CancellableExecution();
+ execute(exchangeHandler, cancellableExecution, timeout, context);
+ return cancellableExecution;
+ }
+
+ private void execute(
+ final AsyncClientExchangeHandler exchangeHandler,
+ final CancellableDependency cancellableDependency,
+ final Timeout timeout,
+ final HttpContext context) {
+ Args.notNull(exchangeHandler, "Exchange handler");
+ Args.notNull(timeout, "Timeout");
+ Args.notNull(context, "Context");
try {
exchangeHandler.produceRequest(new RequestChannel() {
@@ -194,7 +209,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
exchangeHandler.failed(cause);
}
- }, context));
+ }, cancellableDependency, context));
}
@Override
@@ -226,7 +241,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
Args.notNull(timeout, "Timeout");
- final BasicFuture<T> future = new BasicFuture<>(callback);
+ final ComplexFuture<T> future = new ComplexFuture<>(callback);
final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
@Override
@@ -245,7 +260,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
}
});
- execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+ execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
return future;
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
index d5d46c4..791fd13 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
@@ -34,8 +34,11 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
+import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
@@ -50,6 +53,8 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.http.nio.ssl.SecurePortStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequester;
import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequesterBootstrap;
@@ -334,4 +339,36 @@ public class Http2ServerAndMultiplexingRequesterTest {
Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
}
+ @Test
+ public void testMultiplexedRequestCancellation() throws Exception {
+ server.start();
+ final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
+ final ListenerEndpoint listener = future.get();
+ final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+ requester.start();
+
+ final int reqNo = 20;
+
+ final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
+ final Random random = new Random();
+ final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id);
+ for (int i = 0; i < reqNo; i++) {
+ final Cancellable cancellable = requester.execute(
+ new BasicClientExchangeHandler<>(new BasicRequestProducer("POST", target, "/stuff",
+ new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer() {
+
+ @Override
+ public void releaseResources() {
+ super.releaseResources();
+ countDownLatch.countDown();
+ }
+ }), null), TIMEOUT, HttpCoreContext.create());
+ Thread.sleep(random.nextInt(10));
+ cancellable.cancel();
+ }
+ Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
+ Thread.sleep(1500);
+ }
+
}