You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/11/23 18:31:47 UTC

[3/3] httpcomponents-core git commit: HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session

HTTP/2 multiplexed requester to support cancellation of individual message exchanges without termination of the underlying I/O session


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/63563231
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/63563231
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/63563231

Branch: refs/heads/master
Commit: 6356323147ca4a62e1ad89f4fc47c0d5b4ad5770
Parents: 0167512
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Mon Nov 20 14:56:44 2017 +0100
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Thu Nov 23 15:20:44 2017 +0100

----------------------------------------------------------------------
 .../nio/bootstrap/CancellableExecution.java     | 74 ++++++++++++++++++++
 .../bootstrap/Http2MultiplexingRequester.java   | 25 +++++--
 ...Http2ServerAndMultiplexingRequesterTest.java | 37 ++++++++++
 3 files changed, 131 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
new file mode 100644
index 0000000..e96a036
--- /dev/null
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/CancellableExecution.java
@@ -0,0 +1,74 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http2.impl.nio.bootstrap;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+
+final class CancellableExecution implements CancellableDependency, Cancellable {
+
+    private final AtomicBoolean cancelled;
+    private final AtomicReference<Cancellable> dependencyRef;
+
+    CancellableExecution() {
+        this.cancelled = new AtomicBoolean(false);
+        this.dependencyRef = new AtomicReference<>(null);
+    }
+
+    @Override
+    public void setDependency(final Cancellable cancellable) {
+        dependencyRef.set(cancellable);
+        if (cancelled.get()) {
+            final Cancellable dependency = dependencyRef.getAndSet(null);
+            if (dependency != null) {
+                dependency.cancel();
+            }
+        }
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return cancelled.get();
+    }
+
+    @Override
+    public boolean cancel() {
+        if (cancelled.compareAndSet(false, true)) {
+            final Cancellable dependency = dependencyRef.getAndSet(null);
+            if (dependency != null) {
+                dependency.cancel();
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
index 85156e7..8fab172 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java
@@ -34,7 +34,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 
-import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.Cancellable;
+import org.apache.hc.core5.concurrent.CancellableDependency;
+import org.apache.hc.core5.concurrent.ComplexFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Decorator;
@@ -112,13 +114,26 @@ public class Http2MultiplexingRequester extends AsyncRequester{
         connPool.setValidateAfterInactivity(timeValue);
     }
 
-    public void execute(
+    public Cancellable execute(
             final AsyncClientExchangeHandler exchangeHandler,
             final Timeout timeout,
             final HttpContext context) {
         Args.notNull(exchangeHandler, "Exchange handler");
         Args.notNull(timeout, "Timeout");
         Args.notNull(context, "Context");
+        final CancellableExecution cancellableExecution = new CancellableExecution();
+        execute(exchangeHandler, cancellableExecution, timeout, context);
+        return cancellableExecution;
+    }
+
+    private void execute(
+            final AsyncClientExchangeHandler exchangeHandler,
+            final CancellableDependency cancellableDependency,
+            final Timeout timeout,
+            final HttpContext context) {
+        Args.notNull(exchangeHandler, "Exchange handler");
+        Args.notNull(timeout, "Timeout");
+        Args.notNull(context, "Context");
         try {
             exchangeHandler.produceRequest(new RequestChannel() {
 
@@ -194,7 +209,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
                                     exchangeHandler.failed(cause);
                                 }
 
-                            }, context));
+                            }, cancellableDependency, context));
                         }
 
                         @Override
@@ -226,7 +241,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
         Args.notNull(requestProducer, "Request producer");
         Args.notNull(responseConsumer, "Response consumer");
         Args.notNull(timeout, "Timeout");
-        final BasicFuture<T> future = new BasicFuture<>(callback);
+        final ComplexFuture<T> future = new ComplexFuture<>(callback);
         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
 
             @Override
@@ -245,7 +260,7 @@ public class Http2MultiplexingRequester extends AsyncRequester{
             }
 
         });
-        execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
+        execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create());
         return future;
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/63563231/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
index d5d46c4..791fd13 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http2ServerAndMultiplexingRequesterTest.java
@@ -34,8 +34,11 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 
+import org.apache.hc.core5.concurrent.Cancellable;
 import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.HttpHost;
@@ -50,6 +53,8 @@ import org.apache.hc.core5.http.nio.BasicResponseConsumer;
 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
 import org.apache.hc.core5.http.nio.ssl.SecurePortStrategy;
+import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequester;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.Http2MultiplexingRequesterBootstrap;
@@ -334,4 +339,36 @@ public class Http2ServerAndMultiplexingRequesterTest {
         Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
     }
 
+    @Test
+    public void testMultiplexedRequestCancellation() throws Exception {
+        server.start();
+        final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
+        final ListenerEndpoint listener = future.get();
+        final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
+        requester.start();
+
+        final int reqNo = 20;
+
+        final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
+        final Random random = new Random();
+        final HttpHost target = new HttpHost("localhost", address.getPort(), scheme.id);
+        for (int i = 0; i < reqNo; i++) {
+            final Cancellable cancellable = requester.execute(
+                    new BasicClientExchangeHandler<>(new BasicRequestProducer("POST", target, "/stuff",
+                            new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
+                            new BasicResponseConsumer<>(new StringAsyncEntityConsumer() {
+
+                                @Override
+                                public void releaseResources() {
+                                    super.releaseResources();
+                                    countDownLatch.countDown();
+                                }
+                            }), null), TIMEOUT, HttpCoreContext.create());
+            Thread.sleep(random.nextInt(10));
+            cancellable.cancel();
+        }
+        Assert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
+        Thread.sleep(1500);
+    }
+
 }