You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/01/23 10:45:12 UTC
[httpcomponents-client] 01/01: Minor test code simplification in
reactive test suite
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git
commit d89b27533411866804cd8fb978046b3780a30da8
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Mon Dec 23 09:23:21 2019 +0100
Minor test code simplification in reactive test suite
---
.../AbstractHttpReactiveFundamentalsTest.java | 67 +++++++++++++++++-----
1 file changed, 52 insertions(+), 15 deletions(-)
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
index f20ea55..d655b80 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
@@ -26,13 +26,9 @@
*/
package org.apache.hc.client5.testing.async;
-import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
@@ -48,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
@@ -60,11 +57,14 @@ import org.apache.hc.core5.reactive.ReactiveEntityProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.apache.hc.core5.testing.reactive.ReactiveTestUtils;
import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.ByteArrayBuffer;
import org.apache.hc.core5.util.TextUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
@@ -272,19 +272,56 @@ public abstract class AbstractHttpReactiveFundamentalsTest<T extends CloseableHt
}
static byte[] publisherToByteArray(final Publisher<ByteBuffer> publisher) throws Exception {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (WritableByteChannel channel = Channels.newChannel(baos)) {
- final List<ByteBuffer> bufs = Flowable.fromPublisher(publisher)
- .toList()
- .blockingGet();
- if (bufs.isEmpty()) {
- return null;
+ final BasicFuture<byte[]> future = new BasicFuture<>(null);
+ publisher.subscribe(new Subscriber<ByteBuffer>() {
+
+ private ByteArrayBuffer buf;
+
+ @Override
+ public void onSubscribe(final Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
}
- for (final ByteBuffer buf : bufs) {
- channel.write(buf);
+
+ @Override
+ public void onNext(final ByteBuffer src) {
+ synchronized (this) {
+ if (buf == null) {
+ buf = new ByteArrayBuffer(1024);
+ }
+ if (src.hasArray()) {
+ buf.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
+ } else {
+ while (src.hasRemaining()) {
+ buf.append(src.get());
+ }
+ }
+ }
}
- }
- return baos.toByteArray();
+
+ @Override
+ public void onError(final Throwable t) {
+ synchronized (this) {
+ buf = null;
+ }
+ if (t instanceof Exception) {
+ future.failed((Exception) t);
+ } else {
+ future.failed(new RuntimeException(t));
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ final byte[] content;
+ synchronized (this) {
+ content = buf != null ? buf.toByteArray() : null;
+ buf = null;
+ }
+ future.completed(content);
+ }
+
+ });
+ return future.get();
}
private static final class StreamingTestCase {