You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/23 04:21:52 UTC

[james-project] 08/10: JAMES-2851 Add ReactorUtils.toInputStream to convert Flux to lazy InputStream

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4ccb9b1636e5d6043ab37b31acca1c1bd16ce977
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Jul 31 13:15:19 2019 +0200

    JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy InputStream
---
 .../java/org/apache/james/util/ReactorUtils.java   | 70 ++++++++++++++++++
 .../org/apache/james/util/ReactorUtilsTest.java    | 82 +++++++++++++++++++++-
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 42e6d8e..1ed7963 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,10 +18,80 @@
  ****************************************************************/
 package org.apache.james.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ReactorUtils {
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
+
+    public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+        return new StreamInputStream(byteArrays.toStream(1));
+    }
+
+    private static  class StreamInputStream extends InputStream {
+        private static final int NO_MORE_DATA = -1;
+
+        private final Stream<byte[]> source;
+        private final Spliterator<byte[]> spliterator;
+        private Optional<ByteArrayInputStream> currentItemByteStream;
+
+        StreamInputStream(Stream<byte[]> source) {
+            this.source = source;
+            this.spliterator = source.spliterator();
+            this.currentItemByteStream = Optional.empty();
+        }
+
+        @Override
+        public int read() {
+            try {
+                if (!dataAvailableToRead()) {
+                    switchToNextChunk();
+                }
+
+                if (!dataAvailableToRead()) {
+                    source.close();
+                    return NO_MORE_DATA;
+                }
+
+                return currentItemByteStream.map(ByteArrayInputStream::read)
+                    .filter(readResult -> readResult != NO_MORE_DATA)
+                    .orElseGet(this::readNextChunk);
+            } catch (Throwable t) {
+                source.close();
+                throw t;
+            }
+        }
+
+        private boolean dataAvailableToRead() {
+            return currentItemByteStream.isPresent();
+        }
+
+        private void switchToNextChunk() {
+            spliterator.tryAdvance(bytes ->
+                currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+        }
+
+        private Integer readNextChunk() {
+            currentItemByteStream = Optional.empty();
+            return read();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                source.close();
+            } finally {
+                super.close();
+            }
+        }
+    }
 }
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 3c991e0..98bb9e1 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -20,10 +20,18 @@ package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 class ReactorUtilsTest {
 
@@ -69,4 +77,76 @@ class ReactorUtilsTest {
             }
         }
     }
-}
\ No newline at end of file
+
+    @Nested
+    class ToInputStream {
+        @Test
+        void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.range(0, 10)
+                .subscribeOn(Schedulers.elastic())
+                .limitRate(2)
+                .doOnRequest(request -> generateElements.getAndAdd((int) request))
+                .map(index -> new byte[] {(byte) (int) index});
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(6);
+        }
+
+        @Test
+        void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(3);
+        }
+
+        @Test
+        void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(4);
+        }
+
+        @Test
+        void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.<byte[]>empty()
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 0, 0, 0, 0);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(1);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org