You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2018/09/04 09:19:20 UTC
httpcomponents-core git commit: Added generic HTTP stream reset
exception;
removed dependency on HTTP/2 specific code from Reactive Streams module
Repository: httpcomponents-core
Updated Branches:
refs/heads/master 1bae9643c -> a9918e1a0
Added generic HTTP stream reset exception; removed dependency on HTTP/2 specific code from Reactive Streams module
Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/commit/a9918e1a
Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/tree/a9918e1a
Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/diff/a9918e1a
Branch: refs/heads/master
Commit: a9918e1a0c60c042f7fb6c4cfaf32be8bc8af5b7
Parents: 1bae964
Author: Oleg Kalnichevski <ol...@apache.org>
Authored: Fri Aug 31 22:25:13 2018 +0200
Committer: Oleg Kalnichevski <ol...@apache.org>
Committed: Fri Aug 31 22:25:13 2018 +0200
----------------------------------------------------------------------
.../hc/core5/http2/H2StreamResetException.java | 7 +--
.../nio/AbstractHttp2StreamMultiplexer.java | 11 +++-
httpcore5-reactive/pom.xml | 6 --
.../hc/core5/reactive/ReactiveDataConsumer.java | 24 ++++----
.../hc/core5/reactive/ReactiveDataProducer.java | 8 +--
.../reactive/TestReactiveDataConsumer.java | 5 +-
.../reactive/TestReactiveDataProducer.java | 4 +-
.../testing/reactive/ReactiveClientTest.java | 59 ++++++++++----------
.../hc/core5/http/HttpStreamResetException.java | 46 +++++++++++++++
9 files changed, 105 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
index b688def..2be857e 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/H2StreamResetException.java
@@ -26,8 +26,7 @@
*/
package org.apache.hc.core5.http2;
-import java.io.IOException;
-
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.util.Args;
/**
@@ -36,9 +35,7 @@ import org.apache.hc.core5.util.Args;
*
* @since 5.0
*/
-public class H2StreamResetException extends IOException {
-
- private static final long serialVersionUID = 6321637486572232180L;
+public class H2StreamResetException extends HttpStreamResetException {
private final int code;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
----------------------------------------------------------------------
diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
index 316d6e0..7929ad0 100644
--- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
+++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java
@@ -53,6 +53,7 @@ import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.ProtocolVersion;
@@ -84,13 +85,13 @@ import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
import org.apache.hc.core5.http2.nio.AsyncPingHandler;
import org.apache.hc.core5.http2.nio.command.PingCommand;
import org.apache.hc.core5.io.CloseMode;
+import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.ByteArrayBuffer;
import org.apache.hc.core5.util.Identifiable;
-import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConnection {
@@ -733,6 +734,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
consumeDataFrame(frame, stream);
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
+ } catch (final HttpStreamResetException ex) {
+ stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
}
if (stream.isTerminated()) {
@@ -773,6 +776,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
}
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
+ } catch (final HttpStreamResetException ex) {
+ stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
}
if (stream.isTerminated()) {
@@ -795,6 +800,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
consumeContinuationFrame(frame, stream);
} catch (final H2StreamResetException ex) {
stream.localReset(ex);
+ } catch (final HttpStreamResetException ex) {
+ stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
}
if (stream.isTerminated()) {
@@ -943,6 +950,8 @@ abstract class AbstractHttp2StreamMultiplexer implements Identifiable, HttpConne
consumePushPromiseFrame(frame, payload, promisedStream);
} catch (final H2StreamResetException ex) {
promisedStream.localReset(ex);
+ } catch (final HttpStreamResetException ex) {
+ stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
}
}
break;
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/pom.xml
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/pom.xml b/httpcore5-reactive/pom.xml
index 7babffb..eab91e6 100644
--- a/httpcore5-reactive/pom.xml
+++ b/httpcore5-reactive/pom.xml
@@ -63,12 +63,6 @@
<version>2.1.9</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.httpcomponents.core5</groupId>
- <artifactId>httpcore5-h2</artifactId>
- <version>5.0-beta3-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
index 9789a43..8027be4 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java
@@ -26,27 +26,26 @@
*/
package org.apache.hc.core5.reactive;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.util.Args;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* An asynchronous data consumer that supports Reactive Streams.
*
@@ -81,8 +80,7 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf
private void throwIfCancelled() throws IOException {
if (cancelled) {
- throw new H2StreamResetException(H2Error.NO_ERROR,
- "Downstream subscriber to ReactiveDataConsumer cancelled");
+ throw new HttpStreamResetException("Downstream subscriber to ReactiveDataConsumer cancelled");
}
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
index 33d7773..f917a11 100644
--- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
+++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java
@@ -34,10 +34,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.nio.AsyncDataProducer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.util.Args;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -133,10 +132,7 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBu
try {
synchronized (buffers) {
if (t != null) {
- final H2StreamResetException ex = new H2StreamResetException(H2Error.NO_ERROR,
- "Request publisher threw an exception");
- ex.initCause(t);
- throw ex;
+ throw new HttpStreamResetException(t.getMessage(), t);
} else if (this.complete.get() && buffers.isEmpty()) {
channel.endStream();
} else {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
index bd380fe..83751a2 100644
--- a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataConsumer.java
@@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.nio.CapacityChannel;
-import org.apache.hc.core5.http2.H2StreamResetException;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
@@ -49,6 +49,7 @@ import io.reactivex.Single;
import io.reactivex.functions.Consumer;
public class TestReactiveDataConsumer {
+
@Test
public void testStreamThatEndsNormally() throws Exception {
final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
@@ -96,7 +97,7 @@ public class TestReactiveDataConsumer {
Assert.assertSame(ex, single.blockingGet().get(0).getError());
}
- @Test(expected = H2StreamResetException.class)
+ @Test(expected = HttpStreamResetException.class)
public void testCancellation() throws Exception {
final ReactiveDataConsumer consumer = new ReactiveDataConsumer();
consumer.subscribe(new Subscriber<ByteBuffer>() {
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
----------------------------------------------------------------------
diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
index 1117439..73c6c77 100644
--- a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
+++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/TestReactiveDataProducer.java
@@ -29,8 +29,8 @@ package org.apache.hc.core5.reactive;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.nio.DataStreamChannel;
-import org.apache.hc.core5.http2.H2StreamResetException;
import org.junit.Assert;
import org.junit.Test;
@@ -81,7 +81,7 @@ public class TestReactiveDataProducer {
try {
producer.produce(streamChannel);
Assert.fail("Expected ProtocolException");
- } catch (final H2StreamResetException ex) {
+ } catch (final HttpStreamResetException ex) {
Assert.assertTrue("Expected published exception to be rethrown", ex.getCause() instanceof RuntimeException);
Assert.assertEquals("", byteChannel.dump(StandardCharsets.US_ASCII));
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
----------------------------------------------------------------------
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index 21c4be4..9b41129 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -26,20 +26,34 @@
*/
package org.apache.hc.core5.testing.reactive;
-import io.reactivex.Flowable;
-import io.reactivex.Observable;
-import io.reactivex.functions.Action;
-import io.reactivex.functions.Consumer;
-import io.reactivex.functions.Function;
+import java.io.ByteArrayOutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.BasicRequestProducer;
-import org.apache.hc.core5.http2.H2Error;
-import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
@@ -66,25 +80,11 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import io.reactivex.functions.Action;
+import io.reactivex.functions.Consumer;
+import io.reactivex.functions.Function;
@RunWith(Parameterized.class)
public class ReactiveClientTest {
@@ -289,7 +289,7 @@ public class ReactiveClientTest {
future.get();
Assert.fail("Expected exception");
} catch (final ExecutionException ex) {
- Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+ Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
Assert.assertSame(exceptionThrown, ex.getCause().getCause());
}
}
@@ -321,8 +321,7 @@ public class ReactiveClientTest {
cause instanceof SocketTimeoutException);
} else if (versionPolicy == HttpVersionPolicy.FORCE_HTTP_2) {
Assert.assertTrue(String.format("Expected RST_STREAM, but %s was thrown", cause.getClass().getName()),
- cause instanceof H2StreamResetException);
- Assert.assertEquals(H2Error.NO_ERROR.getCode(), ((H2StreamResetException) cause).getCode());
+ cause instanceof HttpStreamResetException);
} else {
Assert.fail("Unknown HttpVersionPolicy: " + versionPolicy);
}
@@ -380,7 +379,7 @@ public class ReactiveClientTest {
future.get();
Assert.fail("Expected exception");
} catch (final ExecutionException | CancellationException ex) {
- Assert.assertTrue(ex.getCause() instanceof H2StreamResetException);
+ Assert.assertTrue(ex.getCause() instanceof HttpStreamResetException);
Assert.assertTrue(requestPublisherWasCancelled.get());
Assert.assertNull(requestStreamError.get());
}
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a9918e1a/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java b/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
new file mode 100644
index 0000000..4313397
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/http/HttpStreamResetException.java
@@ -0,0 +1,46 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.core5.http;
+
+import java.io.IOException;
+
+/**
+ * Signals HTTP protocol error that renders the actual HTTP data stream unreliable.
+ *
+ * @since 5.0
+ */
+public class HttpStreamResetException extends IOException {
+
+ public HttpStreamResetException(final String message) {
+ super(message);
+ }
+
+ public HttpStreamResetException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}