You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/01/23 10:45:12 UTC

[httpcomponents-client] 01/01: Minor test code simplification in reactive test suite

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit d89b27533411866804cd8fb978046b3780a30da8
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Mon Dec 23 09:23:21 2019 +0100

    Minor test code simplification in reactive test suite
---
 .../AbstractHttpReactiveFundamentalsTest.java      | 67 +++++++++++++++++-----
 1 file changed, 52 insertions(+), 15 deletions(-)

diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
index f20ea55..d655b80 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
@@ -26,13 +26,9 @@
  */
 package org.apache.hc.client5.testing.async;
 
-import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
@@ -48,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.HttpHost;
@@ -60,11 +57,14 @@ import org.apache.hc.core5.reactive.ReactiveEntityProducer;
 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils;
 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.ByteArrayBuffer;
 import org.apache.hc.core5.util.TextUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
 import io.reactivex.Flowable;
 import io.reactivex.functions.Consumer;
@@ -272,19 +272,56 @@ public abstract class AbstractHttpReactiveFundamentalsTest<T extends CloseableHt
     }
 
     static byte[] publisherToByteArray(final Publisher<ByteBuffer> publisher) throws Exception {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (WritableByteChannel channel = Channels.newChannel(baos)) {
-            final List<ByteBuffer> bufs = Flowable.fromPublisher(publisher)
-                .toList()
-                .blockingGet();
-            if (bufs.isEmpty()) {
-                return null;
+        final BasicFuture<byte[]> future = new BasicFuture<>(null);
+        publisher.subscribe(new Subscriber<ByteBuffer>() {
+
+            private ByteArrayBuffer buf;
+
+            @Override
+            public void onSubscribe(final Subscription subscription) {
+                subscription.request(Long.MAX_VALUE);
             }
-            for (final ByteBuffer buf : bufs) {
-                channel.write(buf);
+
+            @Override
+            public void onNext(final ByteBuffer src) {
+                synchronized (this) {
+                    if (buf == null) {
+                        buf = new ByteArrayBuffer(1024);
+                    }
+                    if (src.hasArray()) {
+                        buf.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
+                    } else {
+                        while (src.hasRemaining()) {
+                            buf.append(src.get());
+                        }
+                    }
+                }
             }
-        }
-        return baos.toByteArray();
+
+            @Override
+            public void onError(final Throwable t) {
+                synchronized (this) {
+                    buf = null;
+                }
+                if (t instanceof Exception) {
+                    future.failed((Exception) t);
+                } else {
+                    future.failed(new RuntimeException(t));
+                }
+            }
+
+            @Override
+            public void onComplete() {
+                final byte[] content;
+                synchronized (this) {
+                    content = buf != null ? buf.toByteArray() : null;
+                    buf = null;
+                }
+                future.completed(content);
+            }
+
+        });
+        return future.get();
     }
 
     private static final class StreamingTestCase {