You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/23 04:21:52 UTC
[james-project] 08/10: JAMES-2851 Add ReactorUtils.toInputStream to
convert Flux to lazy InputStream
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4ccb9b1636e5d6043ab37b31acca1c1bd16ce977
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Jul 31 13:15:19 2019 +0200
JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy InputStream
---
.../java/org/apache/james/util/ReactorUtils.java | 70 ++++++++++++++++++
.../org/apache/james/util/ReactorUtilsTest.java | 82 +++++++++++++++++++++-
2 files changed, 151 insertions(+), 1 deletion(-)
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 42e6d8e..1ed7963 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,10 +18,80 @@
****************************************************************/
package org.apache.james.util;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorUtils {
public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
return Mono.fromRunnable(runnable).then(Mono.empty());
}
+
+ public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+ return new StreamInputStream(byteArrays.toStream(1));
+ }
+
+ private static class StreamInputStream extends InputStream {
+ private static final int NO_MORE_DATA = -1;
+
+ private final Stream<byte[]> source;
+ private final Spliterator<byte[]> spliterator;
+ private Optional<ByteArrayInputStream> currentItemByteStream;
+
+ StreamInputStream(Stream<byte[]> source) {
+ this.source = source;
+ this.spliterator = source.spliterator();
+ this.currentItemByteStream = Optional.empty();
+ }
+
+ @Override
+ public int read() {
+ try {
+ if (!dataAvailableToRead()) {
+ switchToNextChunk();
+ }
+
+ if (!dataAvailableToRead()) {
+ source.close();
+ return NO_MORE_DATA;
+ }
+
+ return currentItemByteStream.map(ByteArrayInputStream::read)
+ .filter(readResult -> readResult != NO_MORE_DATA)
+ .orElseGet(this::readNextChunk);
+ } catch (Throwable t) {
+ source.close();
+ throw t;
+ }
+ }
+
+ private boolean dataAvailableToRead() {
+ return currentItemByteStream.isPresent();
+ }
+
+ private void switchToNextChunk() {
+ spliterator.tryAdvance(bytes ->
+ currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+ }
+
+ private Integer readNextChunk() {
+ currentItemByteStream = Optional.empty();
+ return read();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ source.close();
+ } finally {
+ super.close();
+ }
+ }
+ }
}
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 3c991e0..98bb9e1 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -20,10 +20,18 @@ package org.apache.james.util;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
class ReactorUtilsTest {
@@ -69,4 +77,76 @@ class ReactorUtilsTest {
}
}
}
-}
\ No newline at end of file
+
+ @Nested
+ class ToInputStream {
+ @Test
+ void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+ AtomicInteger generateElements = new AtomicInteger(0);
+ Flux<byte[]> source = Flux.range(0, 10)
+ .subscribeOn(Schedulers.elastic())
+ .limitRate(2)
+ .doOnRequest(request -> generateElements.getAndAdd((int) request))
+ .map(index -> new byte[] {(byte) (int) index});
+
+ InputStream inputStream = ReactorUtils.toInputStream(source);
+ byte[] readBytes = new byte[5];
+ inputStream.read(readBytes, 0, readBytes.length);
+
+ assertThat(readBytes).contains(0, 1, 2, 3, 4);
+ Thread.sleep(200);
+ assertThat(generateElements.get()).isEqualTo(6);
+ }
+
+ @Test
+ void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+ AtomicInteger generateElements = new AtomicInteger(0);
+ Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+ .subscribeOn(Schedulers.elastic())
+ .limitRate(2)
+ .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+ InputStream inputStream = ReactorUtils.toInputStream(source);
+ byte[] readBytes = new byte[5];
+ inputStream.read(readBytes, 0, readBytes.length);
+
+ assertThat(readBytes).contains(0, 1, 2, 3, 4);
+ Thread.sleep(200);
+ assertThat(generateElements.get()).isEqualTo(3);
+ }
+
+ @Test
+ void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+ AtomicInteger generateElements = new AtomicInteger(0);
+ Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+ .subscribeOn(Schedulers.elastic())
+ .limitRate(2)
+ .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+ InputStream inputStream = ReactorUtils.toInputStream(source);
+ byte[] readBytes = new byte[5];
+ inputStream.read(readBytes, 0, readBytes.length);
+
+ assertThat(readBytes).contains(0, 1, 2, 3, 4);
+ Thread.sleep(200);
+ assertThat(generateElements.get()).isEqualTo(4);
+ }
+
+ @Test
+ void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
+ AtomicInteger generateElements = new AtomicInteger(0);
+ Flux<byte[]> source = Flux.<byte[]>empty()
+ .subscribeOn(Schedulers.elastic())
+ .limitRate(2)
+ .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+ InputStream inputStream = ReactorUtils.toInputStream(source);
+ byte[] readBytes = new byte[5];
+ inputStream.read(readBytes, 0, readBytes.length);
+
+ assertThat(readBytes).contains(0, 0, 0, 0, 0);
+ Thread.sleep(200);
+ assertThat(generateElements.get()).isEqualTo(1);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org