You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by rs...@apache.org on 2019/12/25 04:58:10 UTC
[httpcomponents-core] branch master updated: Add reactive test code
This is an automated email from the ASF dual-hosted git repository.
rschmitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git
The following commit(s) were added to refs/heads/master by this push:
new 863d7c9 Add reactive test code
863d7c9 is described below
commit 863d7c90c4abd76c9f9e083839e189de1c94c901
Author: Ryan Schmitt <rs...@pobox.com>
AuthorDate: Mon Dec 23 21:09:59 2019 -0500
Add reactive test code
This commit adds some utilities that make it easier to write tests of
reactive functionality, including an echo server, a random server, and a
random stream generator. The existing reactive tests have been
refactored to use the latter.
---
httpcore5-testing/pom.xml | 12 +-
.../testing/reactive/ReactiveEchoProcessor.java | 71 +++++++++
.../testing/reactive/ReactiveRandomProcessor.java | 113 +++++++++++++++
.../core5/testing/reactive/ReactiveTestUtils.java | 158 +++++++++++++++++++++
.../core5/testing/reactive/ReactiveClientTest.java | 142 +++---------------
5 files changed, 367 insertions(+), 129 deletions(-)
diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml
index ff668ba..30f8eab 100644
--- a/httpcore5-testing/pom.xml
+++ b/httpcore5-testing/pom.xml
@@ -67,6 +67,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>io.reactivex.rxjava2</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${rxjava.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
@@ -77,12 +83,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.reactivex.rxjava2</groupId>
- <artifactId>rxjava</artifactId>
- <version>${rxjava.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
<scope>test</scope>
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java
new file mode 100644
index 0000000..250901c
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.reactivestreams.Publisher;
+
+public final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
+ public ReactiveEchoProcessor() {
+ }
+
+ @Override
+ public void processRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context,
+ final Publisher<ByteBuffer> requestBody,
+ final Callback<Publisher<ByteBuffer>> responseBodyFuture
+ ) throws HttpException, IOException {
+ if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
+ responseChannel.sendInformation(new BasicHttpResponse(100), context);
+ }
+
+ responseChannel.sendResponse(
+ new BasicHttpResponse(200),
+ new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+ context);
+ responseBodyFuture.execute(requestBody);
+ }
+}
\ No newline at end of file
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java
new file mode 100644
index 0000000..cdcbe50
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.MethodNotSupportedException;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Flowable;
+
+public class ReactiveRandomProcessor implements ReactiveRequestProcessor {
+ public ReactiveRandomProcessor() {
+ }
+
+ @Override
+ public void processRequest(
+ final HttpRequest request,
+ final EntityDetails entityDetails,
+ final ResponseChannel responseChannel,
+ final HttpContext context,
+ final Publisher<ByteBuffer> requestBody,
+ final Callback<Publisher<ByteBuffer>> responseBodyCallback
+ ) throws HttpException, IOException {
+ final String method = request.getMethod();
+ if (!"GET".equalsIgnoreCase(method) &&
+ !"HEAD".equalsIgnoreCase(method) &&
+ !"POST".equalsIgnoreCase(method) &&
+ !"PUT".equalsIgnoreCase(method)) {
+ throw new MethodNotSupportedException(method + " not supported by " + getClass().getName());
+ }
+ final URI uri;
+ try {
+ uri = request.getUri();
+ } catch (final URISyntaxException ex) {
+ throw new ProtocolException(ex.getMessage(), ex);
+ }
+ final String path = uri.getPath();
+ final int slash = path.lastIndexOf('/');
+ if (slash != -1) {
+ final String payload = path.substring(slash + 1);
+ final long n;
+ if (!payload.isEmpty()) {
+ try {
+ n = Long.parseLong(payload);
+ } catch (final NumberFormatException ex) {
+ throw new ProtocolException("Invalid request path: " + path);
+ }
+ } else {
+ // random length, but make sure at least something is sent
+ n = 1 + (int) (Math.random() * 79.0);
+ }
+
+ if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
+ responseChannel.sendInformation(new BasicHttpResponse(100), context);
+ }
+
+ final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+ final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(n);
+ final String hash = ReactiveTestUtils.getStreamHash(n);
+ response.addHeader("response-hash-code", hash);
+ final BasicEntityDetails basicEntityDetails = new BasicEntityDetails(n, ContentType.APPLICATION_OCTET_STREAM);
+ responseChannel.sendResponse(response, basicEntityDetails, context);
+ responseBodyCallback.execute(stream);
+ } else {
+ throw new ProtocolException("Invalid request path: " + path);
+ }
+ }
+}
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java
new file mode 100644
index 0000000..3b67f52
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java
@@ -0,0 +1,158 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.util.TextUtils;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Emitter;
+import io.reactivex.Flowable;
+import io.reactivex.Single;
+import io.reactivex.functions.BiFunction;
+import io.reactivex.functions.Consumer;
+
+public class ReactiveTestUtils {
+ /** The range from which to generate random data. */
+ private final static byte[] RANGE = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ .getBytes(StandardCharsets.US_ASCII);
+
+ /**
+ * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB.
+ *
+ * @param length the number of bytes in the stream
+ * @return a reactive stream of bytes
+ */
+ public static Flowable<ByteBuffer> produceStream(final long length) {
+ return produceStream(length, null);
+ }
+
+ /**
+ * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB, while computing the hash of
+ * the random data.
+ *
+ * @param length the number of bytes in the stream
+ * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
+ * hash will not be computed
+ * @return a reactive stream of bytes
+ */
+ public static Flowable<ByteBuffer> produceStream(final long length, final AtomicReference<String> hash) {
+ return produceStream(length, 128 * 1024, hash);
+ }
+
+ /**
+ * Produces a deterministic stream of bytes, in randomly sized chunks, while computing the hash of the random data.
+ *
+ * @param length the number of bytes in the stream
+ * @param maximumBlockSize the maximum size of any {@code ByteBuffer in the stream}
+ * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
+ * hash will not be computed
+ * @return a reactive stream of bytes
+ */
+ public static Flowable<ByteBuffer> produceStream(
+ final long length,
+ final int maximumBlockSize,
+ final AtomicReference<String> hash
+ ) {
+ return Flowable.generate(new Consumer<Emitter<ByteBuffer>>() {
+ Random random = new Random(length); // Use the length as the random seed for easy reproducibility
+ long bytesEmitted = 0;
+ MessageDigest md = newMessageDigest();
+
+ @Override
+ public void accept(final Emitter<ByteBuffer> emitter) {
+ final long remainingLength = length - bytesEmitted;
+ if (remainingLength == 0) {
+ emitter.onComplete();
+ if (hash != null) {
+ hash.set(TextUtils.toHexString(md.digest()));
+ }
+ } else {
+ final int bufferLength = (int) Math.min(remainingLength, 1 + random.nextInt(maximumBlockSize));
+ final byte[] bs = new byte[bufferLength];
+ for (int i = 0; i < bufferLength; i++) {
+ final byte b = RANGE[(int) (Math.random() * RANGE.length)];
+ bs[i] = b;
+ }
+ if (hash != null) {
+ md.update(bs);
+ }
+ emitter.onNext(ByteBuffer.wrap(bs));
+ bytesEmitted += bufferLength;
+ }
+ }
+ });
+ }
+
+ /**
+ * Computes the hash of the deterministic stream (as produced by {@link #produceStream(long)}).
+ */
+ public static String getStreamHash(final long length) {
+ return TextUtils.toHexString(consumeStream(produceStream(length)).blockingGet().md.digest());
+ }
+
+ /**
+ * Consumes the given stream and returns a data structure containing its length and digest.
+ */
+ public static Single<StreamDescription> consumeStream(final Publisher<ByteBuffer> publisher) {
+ final StreamDescription seed = new StreamDescription(0, newMessageDigest());
+ return Flowable.fromPublisher(publisher)
+ .reduce(seed, new BiFunction<StreamDescription, ByteBuffer, StreamDescription>() {
+ @Override
+ public StreamDescription apply(final StreamDescription desc, final ByteBuffer byteBuffer) {
+ final long length = desc.length + byteBuffer.remaining();
+ desc.md.update(byteBuffer);
+ return new StreamDescription(length, desc.md);
+ }
+ });
+ }
+
+ private static MessageDigest newMessageDigest() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ } catch (final NoSuchAlgorithmException ex) {
+ throw new AssertionError(ex);
+ }
+ }
+
+ public static class StreamDescription {
+ public final long length;
+ public final MessageDigest md;
+
+ public StreamDescription(final long length, final MessageDigest md) {
+ this.length = length;
+ this.md = md;
+ }
+ }
+}
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index aa26f46..d6511e1 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -29,15 +29,12 @@ package org.apache.hc.core5.testing.reactive;
import static java.lang.String.format;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -46,36 +43,22 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Supplier;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HeaderElements;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpHeaders;
-import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.Method;
-import org.apache.hc.core5.http.impl.BasicEntityDetails;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http.message.BasicHttpResponse;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http.nio.ResponseChannel;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
-import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
-import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -86,6 +69,8 @@ import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
+import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.TextUtils;
import org.apache.hc.core5.util.Timeout;
import org.junit.Assert;
import org.junit.Rule;
@@ -101,7 +86,6 @@ import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
-import io.reactivex.functions.Function;
@RunWith(Parameterized.class)
public class ReactiveClientTest {
@@ -128,28 +112,6 @@ public class ReactiveClientTest {
private HttpAsyncServer server;
- private static final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
- @Override
- public void processRequest(
- final HttpRequest request,
- final EntityDetails entityDetails,
- final ResponseChannel responseChannel,
- final HttpContext context,
- final Publisher<ByteBuffer> requestBody,
- final Callback<Publisher<ByteBuffer>> responseBodyFuture
- ) throws HttpException, IOException {
- if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
- responseChannel.sendInformation(new BasicHttpResponse(100), context);
- }
-
- responseChannel.sendResponse(
- new BasicHttpResponse(200),
- new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
- context);
- responseBodyFuture.execute(requestBody);
- }
- }
-
@Rule
public ExternalResource serverResource = new ExternalResource() {
@@ -253,47 +215,20 @@ public class ReactiveClientTest {
@Test
public void testLongRunningRequest() throws Exception {
final InetSocketAddress address = startClientAndServer();
- final AtomicLong requestLength = new AtomicLong(0L);
- final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
- final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 100)
- .map(new Function<Long, ByteBuffer>() {
- @Override
- public ByteBuffer apply(final Long seed) {
- final Random random = new Random(seed);
- // Using blocks slightly larger than the HTTP/2 window size serves to exercise the input window
- // management code
- final byte[] bytes = new byte[65535 + 7];
- requestLength.addAndGet(bytes.length);
- random.nextBytes(bytes);
- return ByteBuffer.wrap(bytes);
- }
- })
- .doOnNext(new Consumer<ByteBuffer>() {
- @Override
- public void accept(final ByteBuffer byteBuffer) {
- requestDigest.get().update(byteBuffer.duplicate());
- }
- });
- final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final long expectedLength = 6_554_200L;
+ final AtomicReference<String> expectedHash = new AtomicReference<>(null);
+ final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
final BasicRequestProducer request = getRequestProducer(address, producer);
final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
requester.execute(request, consumer, SOCKET_TIMEOUT, null);
final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
+ final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
- final AtomicLong responseLength = new AtomicLong(0);
- final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
- Flowable.fromPublisher(response.getBody())
- .blockingForEach(new Consumer<ByteBuffer>() {
- @Override
- public void accept(final ByteBuffer byteBuffer) {
- responseLength.addAndGet(byteBuffer.remaining());
- responseDigest.get().update(byteBuffer);
- }
- });
- Assert.assertEquals(requestLength.get(), responseLength.get());
- Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+ Assert.assertEquals(expectedLength, desc.length);
+ Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
}
@Test
@@ -304,47 +239,21 @@ public class ReactiveClientTest {
// so it's unlikely to consistently occur.
final InetSocketAddress address = startClientAndServer();
for (int i = 0; i < 10; i++) {
- final AtomicLong requestLength = new AtomicLong(0L);
- final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
- final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 1_000)
- .map(new Function<Long, ByteBuffer>() {
- @Override
- public ByteBuffer apply(final Long seed) {
- final Random random = new Random(seed);
- final byte[] bytes = new byte[1024];
- requestLength.addAndGet(bytes.length);
- random.nextBytes(bytes);
- return ByteBuffer.wrap(bytes);
- }
- })
- .doOnNext(new Consumer<ByteBuffer>() {
- @Override
- public void accept(final ByteBuffer byteBuffer) {
- requestDigest.get().update(byteBuffer.duplicate());
- }
- });
- final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final long expectedLength = 1_024_000;
+ final int maximumBlockSize = 1024;
+ final AtomicReference<String> expectedHash = new AtomicReference<>(null);
+ final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
final BasicRequestProducer request = getRequestProducer(address, producer);
final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
requester.execute(request, consumer, SOCKET_TIMEOUT, null);
final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
.get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
+ final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
- final AtomicLong responseLength = new AtomicLong(0);
- final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
- Flowable.fromPublisher(response.getBody())
- .doOnNext(new Consumer<ByteBuffer>() {
- @Override
- public void accept(final ByteBuffer byteBuffer) {
- responseLength.addAndGet(byteBuffer.remaining());
- responseDigest.get().update(byteBuffer);
- }
- })
- .blockingLast(); // This requests Long.MAX_VALUE objects and guarantees we won't call request again
-
- Assert.assertEquals(requestLength.get(), responseLength.get());
- Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+ Assert.assertEquals(expectedLength, desc.length);
+ Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
}
}
@@ -409,16 +318,7 @@ public class ReactiveClientTest {
final InetSocketAddress address = startClientAndServer();
final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
- final Publisher<ByteBuffer> publisher = Flowable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE)
- .map(new Function<Long, ByteBuffer>() {
- @Override
- public ByteBuffer apply(final Long seed) throws Exception {
- final Random random = new Random(seed);
- final byte[] bytes = new byte[1 + random.nextInt(1024)];
- random.nextBytes(bytes);
- return ByteBuffer.wrap(bytes);
- }
- })
+ final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
.doOnCancel(new Action() {
@Override
public void run() throws Exception {
@@ -431,7 +331,7 @@ public class ReactiveClientTest {
requestStreamError.set(throwable);
}
});
- final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+ final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
final BasicRequestProducer request = getRequestProducer(address, producer);
final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
@@ -462,10 +362,6 @@ public class ReactiveClientTest {
}
}
- private static MessageDigest newDigest() throws NoSuchAlgorithmException {
- return MessageDigest.getInstance("MD5");
- }
-
private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
server.start();
final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();