You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/01/23 10:45:11 UTC

[httpcomponents-client] branch development created (now d89b275)

This is an automated email from the ASF dual-hosted git repository.

olegk pushed a change to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git.


      at d89b275  Minor test code simplification in reactive test suite

This branch includes the following new commits:

     new d89b275  Minor test code simplification in reactive test suite

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[httpcomponents-client] 01/01: Minor test code simplification in reactive test suite

Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

olegk pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git

commit d89b27533411866804cd8fb978046b3780a30da8
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Mon Dec 23 09:23:21 2019 +0100

    Minor test code simplification in reactive test suite
---
 .../AbstractHttpReactiveFundamentalsTest.java      | 67 +++++++++++++++++-----
 1 file changed, 52 insertions(+), 15 deletions(-)

diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
index f20ea55..d655b80 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
@@ -26,13 +26,9 @@
  */
 package org.apache.hc.client5.testing.async;
 
-import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
@@ -48,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
 import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.ContentType;
 import org.apache.hc.core5.http.HttpHost;
@@ -60,11 +57,14 @@ import org.apache.hc.core5.reactive.ReactiveEntityProducer;
 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils;
 import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.ByteArrayBuffer;
 import org.apache.hc.core5.util.TextUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
 
 import io.reactivex.Flowable;
 import io.reactivex.functions.Consumer;
@@ -272,19 +272,56 @@ public abstract class AbstractHttpReactiveFundamentalsTest<T extends CloseableHt
     }
 
     static byte[] publisherToByteArray(final Publisher<ByteBuffer> publisher) throws Exception {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (WritableByteChannel channel = Channels.newChannel(baos)) {
-            final List<ByteBuffer> bufs = Flowable.fromPublisher(publisher)
-                .toList()
-                .blockingGet();
-            if (bufs.isEmpty()) {
-                return null;
+        final BasicFuture<byte[]> future = new BasicFuture<>(null);
+        publisher.subscribe(new Subscriber<ByteBuffer>() {
+
+            private ByteArrayBuffer buf;
+
+            @Override
+            public void onSubscribe(final Subscription subscription) {
+                subscription.request(Long.MAX_VALUE);
             }
-            for (final ByteBuffer buf : bufs) {
-                channel.write(buf);
+
+            @Override
+            public void onNext(final ByteBuffer src) {
+                synchronized (this) {
+                    if (buf == null) {
+                        buf = new ByteArrayBuffer(1024);
+                    }
+                    if (src.hasArray()) {
+                        buf.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
+                    } else {
+                        while (src.hasRemaining()) {
+                            buf.append(src.get());
+                        }
+                    }
+                }
             }
-        }
-        return baos.toByteArray();
+
+            @Override
+            public void onError(final Throwable t) {
+                synchronized (this) {
+                    buf = null;
+                }
+                if (t instanceof Exception) {
+                    future.failed((Exception) t);
+                } else {
+                    future.failed(new RuntimeException(t));
+                }
+            }
+
+            @Override
+            public void onComplete() {
+                final byte[] content;
+                synchronized (this) {
+                    content = buf != null ? buf.toByteArray() : null;
+                    buf = null;
+                }
+                future.completed(content);
+            }
+
+        });
+        return future.get();
     }
 
     private static final class StreamingTestCase {