You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2020/01/23 10:45:11 UTC
[httpcomponents-client] branch development created (now d89b275)
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a change to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git.
at d89b275 Minor test code simplification in reactive test suite
This branch includes the following new commits:
new d89b275 Minor test code simplification in reactive test suite
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[httpcomponents-client] 01/01: Minor test code simplification in
reactive test suite
Posted by ol...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
olegk pushed a commit to branch development
in repository https://gitbox.apache.org/repos/asf/httpcomponents-client.git
commit d89b27533411866804cd8fb978046b3780a30da8
Author: Oleg Kalnichevski <ol...@apache.org>
AuthorDate: Mon Dec 23 09:23:21 2019 +0100
Minor test code simplification in reactive test suite
---
.../AbstractHttpReactiveFundamentalsTest.java | 67 +++++++++++++++++-----
1 file changed, 52 insertions(+), 15 deletions(-)
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
index f20ea55..d655b80 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractHttpReactiveFundamentalsTest.java
@@ -26,13 +26,9 @@
*/
package org.apache.hc.client5.testing.async;
-import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
@@ -48,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
@@ -60,11 +57,14 @@ import org.apache.hc.core5.reactive.ReactiveEntityProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.apache.hc.core5.testing.reactive.ReactiveTestUtils;
import org.apache.hc.core5.testing.reactive.ReactiveTestUtils.StreamDescription;
+import org.apache.hc.core5.util.ByteArrayBuffer;
import org.apache.hc.core5.util.TextUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
@@ -272,19 +272,56 @@ public abstract class AbstractHttpReactiveFundamentalsTest<T extends CloseableHt
}
static byte[] publisherToByteArray(final Publisher<ByteBuffer> publisher) throws Exception {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (WritableByteChannel channel = Channels.newChannel(baos)) {
- final List<ByteBuffer> bufs = Flowable.fromPublisher(publisher)
- .toList()
- .blockingGet();
- if (bufs.isEmpty()) {
- return null;
+ final BasicFuture<byte[]> future = new BasicFuture<>(null);
+ publisher.subscribe(new Subscriber<ByteBuffer>() {
+
+ private ByteArrayBuffer buf;
+
+ @Override
+ public void onSubscribe(final Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
}
- for (final ByteBuffer buf : bufs) {
- channel.write(buf);
+
+ @Override
+ public void onNext(final ByteBuffer src) {
+ synchronized (this) {
+ if (buf == null) {
+ buf = new ByteArrayBuffer(1024);
+ }
+ if (src.hasArray()) {
+ buf.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
+ } else {
+ while (src.hasRemaining()) {
+ buf.append(src.get());
+ }
+ }
+ }
}
- }
- return baos.toByteArray();
+
+ @Override
+ public void onError(final Throwable t) {
+ synchronized (this) {
+ buf = null;
+ }
+ if (t instanceof Exception) {
+ future.failed((Exception) t);
+ } else {
+ future.failed(new RuntimeException(t));
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ final byte[] content;
+ synchronized (this) {
+ content = buf != null ? buf.toByteArray() : null;
+ buf = null;
+ }
+ future.completed(content);
+ }
+
+ });
+ return future.get();
}
private static final class StreamingTestCase {