You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/04 09:19:20 UTC

httpcomponents-core git commit: Added generic HTTP stream reset exception; removed dependency on HTTP/2 specific code from Reactive Streams module

Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 1bae9643c -> a9918e1a0


Added generic HTTP stream reset exception; removed dependency on HTTP/2 specific code from Reactive Streams module


Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/a9918e1a
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/a9918e1a
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/a9918e1a

Branch: refs/heads/master
Commit: a9918e1a0c60c042f7fb6c4cfaf32be8bc8af5b7
Parents: 1bae964
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Fri Aug 31 22:25:13 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Aug 31 22:25:13 2018 +0200

----------------------------------------------------------------------
 .../hc/core5/http2/H2StreamResetException.java  |  7 +--
 .../nio/AbstractHttp2StreamMultiplexer.java     | 11 +++-
 httpcore5-reactive/pom.xml                      |  6 --
 .../hc/core5/reactive/ReactiveDataConsumer.java | 24 ++++----
 .../hc/core5/reactive/ReactiveDataProducer.java |  8 +--
 .../reactive/TestReactiveDataConsumer.java      |  5 +-
 .../reactive/TestReactiveDataProducer.java      |  4 +-
 .../testing/reactive/ReactiveClientTest.java    | 59 ++++++++++----------
 .../hc/core5/http/HttpStreamResetException.java | 46 +++++++++++++++
 9 files changed, 105 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
index b688def..2be857e 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
@@ -26,8 +26,7 @@
  */
 package org.apache.hc.core5.http2;
 
-import java.io.IOException;
-
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.util.Args;
 
 /**
@@ -36,9 +35,7 @@ import org.apache.hc.core5.util.Args;
  *
  * @since 5.0
  */
-public class H2StreamResetException extends IOException {
-
-    private static final long serialVersionUID = 6321637486572232180L;
+public class H2StreamResetException extends HttpStreamResetException {
 
     private final int code;
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 316d6e0..7929ad0 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -53,6 +53,7 @@ import org.apache.hc.core5.http.EndpointDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpConnection;
 import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.HttpVersion;
 import org.apache.hc.core5.http.ProtocolException;
 import org.apache.hc.core5.http.ProtocolVersion;
@@ -84,13 +85,13 @@ import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
 import org.apache.hc.core5.http2.nio.AsyncPingHandler;
 import org.apache.hc.core5.http2.nio.command.PingCommand;
 import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
 import org.apache.hc.core5.reactor.Command;
 import org.apache.hc.core5.reactor.ProtocolIOSession;
 import org.apache.hc.core5.reactor.ssl.TlsDetails;
 import org.apache.hc.core5.util.Args;
 import org.apache.hc.core5.util.ByteArrayBuffer;
 import org.apache.hc.core5.util.Identifiable;
-import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
 
 abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConnection {
 
@@ -733,6 +734,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     consumeDataFrame(frame, stream);
                 } catch (final H2StreamResetException ex) {
                     stream.localReset(ex);
+                } catch (final HttpStreamResetException ex) {
+                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                 }
 
                 if (stream.isTerminated()) {
@@ -773,6 +776,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     }
                 } catch (final H2StreamResetException ex) {
                     stream.localReset(ex);
+                } catch (final HttpStreamResetException ex) {
+                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                 }
 
                 if (stream.isTerminated()) {
@@ -795,6 +800,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     consumeContinuationFrame(frame, stream);
                 } catch (final H2StreamResetException ex) {
                     stream.localReset(ex);
+                } catch (final HttpStreamResetException ex) {
+                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                 }
 
                 if (stream.isTerminated()) {
@@ -943,6 +950,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
                     consumePushPromiseFrame(frame, payload, promisedStream);
                 } catch (final H2StreamResetException ex) {
                     promisedStream.localReset(ex);
+                } catch (final HttpStreamResetException ex) {
+                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
                 }
             }
             break;

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/pom.xml b/httpcore5-reactive/pom.xml
index 7babffb..eab91e6 100644
--- a/httpcore5-reactive/pom.xml
+++ b/httpcore5-reactive/pom.xml
@@ -63,12 +63,6 @@
       <version>2.1.9</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.httpcomponents.core5</groupId>
-      <artifactId>httpcore5-h2</artifactId>
-      <version>5.0-beta3-SNAPSHOT</version>
-      <scope>compile</scope>
-    </dependency>
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
index 9789a43..8027be4 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -26,27 +26,26 @@
  */
 package org.apache.hc.core5.reactive;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
 import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.util.Args;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * An asynchronous data consumer that supports Reactive Streams.
  *
@@ -81,8 +80,7 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
 
     private void throwIfCancelled() throws IOException {
         if (cancelled) {
-            throw new H2StreamResetException(H2Error.NO_ERROR,
-                "Downstream subscriber to ReactiveDataConsumer cancelled");
+            throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
index 33d7773..f917a11 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -34,10 +34,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.core5.annotation.Contract;
 import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.nio.AsyncDataProducer;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.util.Args;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -133,10 +132,7 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBu
         try {
             synchronized (buffers) {
                 if (t != null) {
-                    final H2StreamResetException ex = new H2StreamResetException(H2Error.NO_ERROR,
-                        "Request publisher threw an exception");
-                    ex.initCause(t);
-                    throw ex;
+                    throw new HttpStreamResetException(t.getMessage(), t);
                 } else if (this.complete.get() && buffers.isEmpty()) {
                     channel.endStream();
                 } else {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
index bd380fe..83751a2 100644
--- a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
@@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http2.H2StreamResetException;
 import org.junit.Assert;
 import org.junit.Test;
 import org.reactivestreams.Subscriber;
@@ -49,6 +49,7 @@ import io.reactivex.Single;
 import io.reactivex.functions.Consumer;
 
 public class TestReactiveDataConsumer {
+
     @Test
     public void testStreamThatEndsNormally() throws Exception {
         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
@@ -96,7 +97,7 @@ public class TestReactiveDataConsumer {
         Assert.assertSame(ex, single.blockingGet().get(0).getError());
     }
 
-    @Test(expected = H2StreamResetException.class)
+    @Test(expected = HttpStreamResetException.class)
     public void testCancellation() throws Exception {
         final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
         consumer.subscribe(new Subscriber<ByteBuffer>() {

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
index 1117439..73c6c77 100644
--- a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
@@ -29,8 +29,8 @@ package org.apache.hc.core5.reactive;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.H2StreamResetException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -81,7 +81,7 @@ public class TestReactiveDataProducer {
         try {
             producer.produce(streamChannel);
             Assert.fail("Expected ProtocolException");
-        } catch (final H2StreamResetException ex) {
+        } catch (final HttpStreamResetException ex) {
             Assert.assertTrue("Expected published exception to be rethrown", ex.getCause() instanceof RuntimeException);
             Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index 21c4be4..9b41129 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -26,20 +26,34 @@
  */
 package org.apache.hc.core5.testing.reactive;
 
-import io.reactivex.Flowable;
-import io.reactivex.Observable;
-import io.reactivex.functions.Action;
-import io.reactivex.functions.Consumer;
-import io.reactivex.functions.Function;
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.hc.core5.function.Supplier;
 import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.Message;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http.nio.BasicRequestProducer;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
@@ -66,25 +80,11 @@ import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
 
 @RunWith(Parameterized.class)
 public class ReactiveClientTest {
@@ -289,7 +289,7 @@ public class ReactiveClientTest {
             future.get();
             Assert.fail("Expected exception");
         } catch (final ExecutionException ex) {
-            Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+            Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
             Assert.assertSame(exceptionThrown, ex.getCause().getCause());
         }
     }
@@ -321,8 +321,7 @@ public class ReactiveClientTest {
                     cause instanceof SocketTimeoutException);
             } else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
                 Assert.assertTrue(String.format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
-                    cause instanceof H2StreamResetException);
-                Assert.assertEquals(H2Error.NO_ERROR.getCode(), ((H2StreamResetException) cause).getCode());
+                    cause instanceof HttpStreamResetException);
             } else {
                 Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
             }
@@ -380,7 +379,7 @@ public class ReactiveClientTest {
             future.get();
             Assert.fail("Expected exception");
         } catch (final ExecutionException | CancellationException ex) {
-            Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+            Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
             Assert.assertTrue(requestPublisherWasCancelled.get());
             Assert.assertNull(requestStreamError.get());
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java b/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
new file mode 100644
index 0000000..4313397
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
@@ -0,0 +1,46 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http;
+
+import java.io.IOException;
+
+/**
+ * Signals HTTP protocol error that renders the actual HTTP data stream unreliable.
+ *
+ * @since 5.0
+ */
+public class HttpStreamResetException extends IOException {
+
+    public HttpStreamResetException(final String message) {
+        super(message);
+    }
+
+    public HttpStreamResetException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+}