You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by rs...@apache.org on 2019/12/25 04:58:10 UTC

[httpcomponents-core] branch master updated: Add reactive test code

This is an automated email from the ASF dual-hosted git repository.

rschmitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/httpcomponents-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 863d7c9  Add reactive test code
863d7c9 is described below

commit 863d7c90c4abd76c9f9e083839e189de1c94c901
Author: Ryan Schmitt <rs...@pobox.com>
AuthorDate: Mon Dec 23 21:09:59 2019 -0500

    Add reactive test code
    
    This commit adds some utilities that make it easier to write tests of
    reactive functionality, including an echo server, a random server, and a
    random stream generator. The existing reactive tests have been
    refactored to use the latter.
---
 httpcore5-testing/pom.xml                          |  12 +-
 .../testing/reactive/ReactiveEchoProcessor.java    |  71 +++++++++
 .../testing/reactive/ReactiveRandomProcessor.java  | 113 +++++++++++++++
 .../core5/testing/reactive/ReactiveTestUtils.java  | 158 +++++++++++++++++++++
 .../core5/testing/reactive/ReactiveClientTest.java | 142 +++---------------
 5 files changed, 367 insertions(+), 129 deletions(-)

diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml
index ff668ba..30f8eab 100644
--- a/httpcore5-testing/pom.xml
+++ b/httpcore5-testing/pom.xml
@@ -67,6 +67,12 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>io.reactivex.rxjava2</groupId>
+      <artifactId>rxjava</artifactId>
+      <version>${rxjava.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
@@ -77,12 +83,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>io.reactivex.rxjava2</groupId>
-      <artifactId>rxjava</artifactId>
-      <version>${rxjava.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.conscrypt</groupId>
       <artifactId>conscrypt-openjdk-uber</artifactId>
       <scope>test</scope>
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java
new file mode 100644
index 0000000..250901c
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveEchoProcessor.java
@@ -0,0 +1,71 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.reactivestreams.Publisher;
+
+public final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
+    public ReactiveEchoProcessor() {
+    }
+
+    @Override
+    public void processRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel,
+            final HttpContext context,
+            final Publisher<ByteBuffer> requestBody,
+            final Callback<Publisher<ByteBuffer>> responseBodyFuture
+    ) throws HttpException, IOException {
+        if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
+            responseChannel.sendInformation(new BasicHttpResponse(100), context);
+        }
+
+        responseChannel.sendResponse(
+                new BasicHttpResponse(200),
+                new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
+                context);
+        responseBodyFuture.execute(requestBody);
+    }
+}
\ No newline at end of file
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java
new file mode 100644
index 0000000..cdcbe50
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveRandomProcessor.java
@@ -0,0 +1,113 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.MethodNotSupportedException;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.impl.BasicEntityDetails;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicHttpResponse;
+import org.apache.hc.core5.http.nio.ResponseChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Flowable;
+
+public class ReactiveRandomProcessor implements ReactiveRequestProcessor {
+    public ReactiveRandomProcessor() {
+    }
+
+    @Override
+    public void processRequest(
+            final HttpRequest request,
+            final EntityDetails entityDetails,
+            final ResponseChannel responseChannel,
+            final HttpContext context,
+            final Publisher<ByteBuffer> requestBody,
+            final Callback<Publisher<ByteBuffer>> responseBodyCallback
+    ) throws HttpException, IOException {
+        final String method = request.getMethod();
+        if (!"GET".equalsIgnoreCase(method) &&
+                !"HEAD".equalsIgnoreCase(method) &&
+                !"POST".equalsIgnoreCase(method) &&
+                !"PUT".equalsIgnoreCase(method)) {
+            throw new MethodNotSupportedException(method + " not supported by " + getClass().getName());
+        }
+        final URI uri;
+        try {
+            uri = request.getUri();
+        } catch (final URISyntaxException ex) {
+            throw new ProtocolException(ex.getMessage(), ex);
+        }
+        final String path = uri.getPath();
+        final int slash = path.lastIndexOf('/');
+        if (slash != -1) {
+            final String payload = path.substring(slash + 1);
+            final long n;
+            if (!payload.isEmpty()) {
+                try {
+                    n = Long.parseLong(payload);
+                } catch (final NumberFormatException ex) {
+                    throw new ProtocolException("Invalid request path: " + path);
+                }
+            } else {
+                // random length, but make sure at least something is sent
+                n = 1 + (int) (Math.random() * 79.0);
+            }
+
+            if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
+                responseChannel.sendInformation(new BasicHttpResponse(100), context);
+            }
+
+            final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK);
+            final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(n);
+            final String hash = ReactiveTestUtils.getStreamHash(n);
+            response.addHeader("response-hash-code", hash);
+            final BasicEntityDetails basicEntityDetails = new BasicEntityDetails(n, ContentType.APPLICATION_OCTET_STREAM);
+            responseChannel.sendResponse(response, basicEntityDetails, context);
+            responseBodyCallback.execute(stream);
+        } else {
+            throw new ProtocolException("Invalid request path: " + path);
+        }
+    }
+}
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java
new file mode 100644
index 0000000..3b67f52
--- /dev/null
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/reactive/ReactiveTestUtils.java
@@ -0,0 +1,158 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.testing.reactive;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.util.TextUtils;
+import org.reactivestreams.Publisher;
+
+import io.reactivex.Emitter;
+import io.reactivex.Flowable;
+import io.reactivex.Single;
+import io.reactivex.functions.BiFunction;
+import io.reactivex.functions.Consumer;
+
+public class ReactiveTestUtils {
+    /** The range from which to generate random data. */
+    private final static byte[] RANGE = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+            .getBytes(StandardCharsets.US_ASCII);
+
+    /**
+     * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB.
+     *
+     * @param length the number of bytes in the stream
+     * @return a reactive stream of bytes
+     */
+    public static Flowable<ByteBuffer> produceStream(final long length) {
+        return produceStream(length, null);
+    }
+
+    /**
+     * Produces a deterministic stream of bytes, in randomly sized chunks of up to 128kB, while computing the hash of
+     * the random data.
+     *
+     * @param length the number of bytes in the stream
+     * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
+     *             hash will not be computed
+     * @return a reactive stream of bytes
+     */
+    public static Flowable<ByteBuffer> produceStream(final long length, final AtomicReference<String> hash) {
+        return produceStream(length, 128 * 1024, hash);
+    }
+
+    /**
+     * Produces a deterministic stream of bytes, in randomly sized chunks, while computing the hash of the random data.
+     *
+     * @param length the number of bytes in the stream
+     * @param maximumBlockSize the maximum size of any {@code ByteBuffer in the stream}
+     * @param hash an output argument for the hash, set when the end of the stream is reached; if {@code null}, the
+     *             hash will not be computed
+     * @return a reactive stream of bytes
+     */
+    public static Flowable<ByteBuffer> produceStream(
+            final long length,
+            final int maximumBlockSize,
+            final AtomicReference<String> hash
+    ) {
+        return Flowable.generate(new Consumer<Emitter<ByteBuffer>>() {
+            Random random = new Random(length); // Use the length as the random seed for easy reproducibility
+            long bytesEmitted = 0;
+            MessageDigest md = newMessageDigest();
+
+            @Override
+            public void accept(final Emitter<ByteBuffer> emitter) {
+                final long remainingLength = length - bytesEmitted;
+                if (remainingLength == 0) {
+                    emitter.onComplete();
+                    if (hash != null) {
+                        hash.set(TextUtils.toHexString(md.digest()));
+                    }
+                } else {
+                    final int bufferLength = (int) Math.min(remainingLength, 1 + random.nextInt(maximumBlockSize));
+                    final byte[] bs = new byte[bufferLength];
+                    for (int i = 0; i < bufferLength; i++) {
+                        final byte b = RANGE[(int) (Math.random() * RANGE.length)];
+                        bs[i] = b;
+                    }
+                    if (hash != null) {
+                        md.update(bs);
+                    }
+                    emitter.onNext(ByteBuffer.wrap(bs));
+                    bytesEmitted += bufferLength;
+                }
+            }
+        });
+    }
+
+    /**
+     * Computes the hash of the deterministic stream (as produced by {@link #produceStream(long)}).
+     */
+    public static String getStreamHash(final long length) {
+        return TextUtils.toHexString(consumeStream(produceStream(length)).blockingGet().md.digest());
+    }
+
+    /**
+     * Consumes the given stream and returns a data structure containing its length and digest.
+     */
+    public static Single<StreamDescription> consumeStream(final Publisher<ByteBuffer> publisher) {
+        final StreamDescription seed = new StreamDescription(0, newMessageDigest());
+        return Flowable.fromPublisher(publisher)
+                .reduce(seed, new BiFunction<StreamDescription, ByteBuffer, StreamDescription>() {
+                    @Override
+                    public StreamDescription apply(final StreamDescription desc, final ByteBuffer byteBuffer) {
+                        final long length = desc.length + byteBuffer.remaining();
+                        desc.md.update(byteBuffer);
+                        return new StreamDescription(length, desc.md);
+                    }
+                });
+    }
+
+    private static MessageDigest newMessageDigest() {
+        try {
+            return MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException ex) {
+            throw new AssertionError(ex);
+        }
+    }
+
+    public static class StreamDescription {
+        public final long length;
+        public final MessageDigest md;
+
+        public StreamDescription(final long length, final MessageDigest md) {
+            this.length = length;
+            this.md = md;
+        }
+    }
+}
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
index aa26f46..d6511e1 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/reactive/ReactiveClientTest.java
@@ -29,15 +29,12 @@ package org.apache.hc.core5.testing.reactive;
 import static java.lang.String.format;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -46,36 +43,22 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.function.Supplier;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.EntityDetails;
-import org.apache.hc.core5.http.HeaderElements;
-import org.apache.hc.core5.http.HttpException;
-import org.apache.hc.core5.http.HttpHeaders;
-import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpStreamResetException;
 import org.apache.hc.core5.http.Message;
 import org.apache.hc.core5.http.Method;
-import org.apache.hc.core5.http.impl.BasicEntityDetails;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http.message.BasicHttpResponse;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
-import org.apache.hc.core5.http.nio.ResponseChannel;
 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
-import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.reactive.ReactiveEntityProducer;
-import org.apache.hc.core5.reactive.ReactiveRequestProcessor;
 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
 import org.apache.hc.core5.reactor.IOReactorConfig;
@@ -86,6 +69,8 @@ import org.apache.hc.core5.testing.nio.LoggingH2StreamListener;
 import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener;
 import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator;
 import org.apache.hc.core5.testing.nio.LoggingIOSessionListener;
+import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.TextUtils;
 import org.apache.hc.core5.util.Timeout;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -101,7 +86,6 @@ import io.reactivex.Flowable;
 import io.reactivex.Observable;
 import io.reactivex.functions.Action;
 import io.reactivex.functions.Consumer;
-import io.reactivex.functions.Function;
 
 @RunWith(Parameterized.class)
 public class ReactiveClientTest {
@@ -128,28 +112,6 @@ public class ReactiveClientTest {
 
     private HttpAsyncServer server;
 
-    private static final class ReactiveEchoProcessor implements ReactiveRequestProcessor {
-        @Override
-        public void processRequest(
-                final HttpRequest request,
-                final EntityDetails entityDetails,
-                final ResponseChannel responseChannel,
-                final HttpContext context,
-                final Publisher<ByteBuffer> requestBody,
-                final Callback<Publisher<ByteBuffer>> responseBodyFuture
-        ) throws HttpException, IOException {
-            if (new BasicHeader(HttpHeaders.EXPECT, HeaderElements.CONTINUE).equals(request.getHeader(HttpHeaders.EXPECT))) {
-                responseChannel.sendInformation(new BasicHttpResponse(100), context);
-            }
-
-            responseChannel.sendResponse(
-                    new BasicHttpResponse(200),
-                    new BasicEntityDetails(-1, ContentType.APPLICATION_OCTET_STREAM),
-                    context);
-            responseBodyFuture.execute(requestBody);
-        }
-    }
-
     @Rule
     public ExternalResource serverResource = new ExternalResource() {
 
@@ -253,47 +215,20 @@ public class ReactiveClientTest {
     @Test
     public void testLongRunningRequest() throws Exception {
         final InetSocketAddress address = startClientAndServer();
-        final AtomicLong requestLength = new AtomicLong(0L);
-        final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
-        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 100)
-            .map(new Function<Long, ByteBuffer>() {
-                @Override
-                public ByteBuffer apply(final Long seed) {
-                    final Random random = new Random(seed);
-                    // Using blocks slightly larger than the HTTP/2 window size serves to exercise the input window
-                    // management code
-                    final byte[] bytes = new byte[65535 + 7];
-                    requestLength.addAndGet(bytes.length);
-                    random.nextBytes(bytes);
-                    return ByteBuffer.wrap(bytes);
-                }
-            })
-            .doOnNext(new Consumer<ByteBuffer>() {
-                @Override
-                public void accept(final ByteBuffer byteBuffer) {
-                    requestDigest.get().update(byteBuffer.duplicate());
-                }
-            });
-        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+        final long expectedLength = 6_554_200L;
+        final AtomicReference<String> expectedHash = new AtomicReference<>(null);
+        final Flowable<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, expectedHash);
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
         final BasicRequestProducer request = getRequestProducer(address, producer);
 
         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
         requester.execute(request, consumer, SOCKET_TIMEOUT, null);
         final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
+        final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
 
-        final AtomicLong responseLength = new AtomicLong(0);
-        final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
-        Flowable.fromPublisher(response.getBody())
-            .blockingForEach(new Consumer<ByteBuffer>() {
-                @Override
-                public void accept(final ByteBuffer byteBuffer) {
-                    responseLength.addAndGet(byteBuffer.remaining());
-                    responseDigest.get().update(byteBuffer);
-                }
-            });
-        Assert.assertEquals(requestLength.get(), responseLength.get());
-        Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+        Assert.assertEquals(expectedLength, desc.length);
+        Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
     }
 
     @Test
@@ -304,47 +239,21 @@ public class ReactiveClientTest {
         // so it's unlikely to consistently occur.
         final InetSocketAddress address = startClientAndServer();
         for (int i = 0; i < 10; i++) {
-            final AtomicLong requestLength = new AtomicLong(0L);
-            final AtomicReference<MessageDigest> requestDigest = new AtomicReference<>(newDigest());
-            final Publisher<ByteBuffer> publisher = Flowable.rangeLong(1, 1_000)
-                .map(new Function<Long, ByteBuffer>() {
-                    @Override
-                    public ByteBuffer apply(final Long seed) {
-                        final Random random = new Random(seed);
-                        final byte[] bytes = new byte[1024];
-                        requestLength.addAndGet(bytes.length);
-                        random.nextBytes(bytes);
-                        return ByteBuffer.wrap(bytes);
-                    }
-                })
-                .doOnNext(new Consumer<ByteBuffer>() {
-                    @Override
-                    public void accept(final ByteBuffer byteBuffer) {
-                        requestDigest.get().update(byteBuffer.duplicate());
-                    }
-                });
-            final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+            final long expectedLength = 1_024_000;
+            final int maximumBlockSize = 1024;
+            final AtomicReference<String> expectedHash = new AtomicReference<>(null);
+            final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(expectedLength, maximumBlockSize, expectedHash);
+            final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
             final BasicRequestProducer request = getRequestProducer(address, producer);
 
             final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
             requester.execute(request, consumer, SOCKET_TIMEOUT, null);
             final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture()
                 .get(RESULT_TIMEOUT.getDuration(), RESULT_TIMEOUT.getTimeUnit());
+            final StreamDescription desc = ReactiveTestUtils.consumeStream(response.getBody()).blockingGet();
 
-            final AtomicLong responseLength = new AtomicLong(0);
-            final AtomicReference<MessageDigest> responseDigest = new AtomicReference<>(newDigest());
-            Flowable.fromPublisher(response.getBody())
-                .doOnNext(new Consumer<ByteBuffer>() {
-                    @Override
-                    public void accept(final ByteBuffer byteBuffer) {
-                        responseLength.addAndGet(byteBuffer.remaining());
-                        responseDigest.get().update(byteBuffer);
-                    }
-                })
-                .blockingLast(); // This requests Long.MAX_VALUE objects and guarantees we won't call request again
-
-            Assert.assertEquals(requestLength.get(), responseLength.get());
-            Assert.assertArrayEquals(requestDigest.get().digest(), responseDigest.get().digest());
+            Assert.assertEquals(expectedLength, desc.length);
+            Assert.assertEquals(expectedHash.get(), TextUtils.toHexString(desc.md.digest()));
         }
     }
 
@@ -409,16 +318,7 @@ public class ReactiveClientTest {
         final InetSocketAddress address = startClientAndServer();
         final AtomicBoolean requestPublisherWasCancelled = new AtomicBoolean(false);
         final AtomicReference<Throwable> requestStreamError = new AtomicReference<>();
-        final Publisher<ByteBuffer> publisher = Flowable.rangeLong(Long.MIN_VALUE, Long.MAX_VALUE)
-            .map(new Function<Long, ByteBuffer>() {
-                @Override
-                public ByteBuffer apply(final Long seed) throws Exception {
-                    final Random random = new Random(seed);
-                    final byte[] bytes = new byte[1 + random.nextInt(1024)];
-                    random.nextBytes(bytes);
-                    return ByteBuffer.wrap(bytes);
-                }
-            })
+        final Publisher<ByteBuffer> stream = ReactiveTestUtils.produceStream(Long.MAX_VALUE, 1024, null)
             .doOnCancel(new Action() {
                 @Override
                 public void run() throws Exception {
@@ -431,7 +331,7 @@ public class ReactiveClientTest {
                     requestStreamError.set(throwable);
                 }
             });
-        final ReactiveEntityProducer producer = new ReactiveEntityProducer(publisher, -1, null, null);
+        final ReactiveEntityProducer producer = new ReactiveEntityProducer(stream, -1, null, null);
         final BasicRequestProducer request = getRequestProducer(address, producer);
 
         final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
@@ -462,10 +362,6 @@ public class ReactiveClientTest {
         }
     }
 
-    private static MessageDigest newDigest() throws NoSuchAlgorithmException {
-        return MessageDigest.getInstance("MD5");
-    }
-
     private InetSocketAddress startClientAndServer() throws InterruptedException, ExecutionException {
         server.start();
         final ListenerEndpoint listener = server.listen(new InetSocketAddress(0)).get();