You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/14 08:49:49 UTC

[james-project] branch master updated: [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 47825dcad1 [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
47825dcad1 is described below

commit 47825dcad138e28552ad7f1ca2bb45e12766c0cd
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri Apr 14 15:49:42 2023 +0700

    [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
---
 .../blob/cassandra/CassandraBlobStoreDAO.java      |   4 +-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |   4 +-
 .../java/org/apache/james/util/DataChunker.java    |  27 ----
 .../org/apache/james/util/DataChunkerTest.java     | 177 ++++++---------------
 4 files changed, 52 insertions(+), 160 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index 9e2d72b7be..d73e68d166 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -54,6 +54,7 @@ import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * WARNING: JAMES-3591 Cassandra is not made to store large binary content, its use will be suboptimal compared to
@@ -125,7 +126,8 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
         Preconditions.checkNotNull(bucketName);
         Preconditions.checkNotNull(inputStream);
 
-        return Mono.fromCallable(() -> DataChunker.chunkStream(inputStream, configuration.getBlobPartSize()))
+        return Mono.fromCallable(() -> ReactorUtils.toChunks(inputStream, configuration.getBlobPartSize())
+                .subscribeOn(Schedulers.boundedElastic()))
             .flatMap(chunks -> save(bucketName, blobId, chunks))
             .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e));
     }
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index ba85e2375e..725cf8dcd8 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -44,7 +44,6 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
 import org.apache.james.lifecycle.api.Startable;
-import org.apache.james.util.DataChunker;
 import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 
@@ -330,7 +329,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
         if (chunkSize == 0) {
             return Flux.empty();
         }
-        return DataChunker.chunkStream(stream, chunkSize);
+        return ReactorUtils.toChunks(stream, chunkSize)
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
diff --git a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
index 0db274d32f..9677cdf4f2 100644
--- a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
+++ b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
@@ -19,17 +19,12 @@
 
 package org.apache.james.util;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.james.util.io.UnsynchronizedBufferedInputStream;
-
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 public class DataChunker {
 
@@ -54,26 +49,4 @@ public class DataChunker {
         }
         return Mono.just(ByteBuffer.wrap(data, offset, data.length - offset));
     }
-
-    public static Flux<ByteBuffer> chunkStream(InputStream data, int chunkSize) {
-        Preconditions.checkNotNull(data);
-        Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
-        UnsynchronizedBufferedInputStream bufferedInputStream = new UnsynchronizedBufferedInputStream(data);
-        return Flux.<ByteBuffer>generate(sink -> {
-                try {
-                    byte[] buffer = new byte[chunkSize];
-
-                    int size = bufferedInputStream.read(buffer);
-                    if (size <= 0) {
-                        sink.complete();
-                    } else {
-                        sink.next(ByteBuffer.wrap(buffer, 0, size));
-                    }
-                } catch (IOException e) {
-                    sink.error(e);
-                }
-            })
-            .subscribeOn(Schedulers.boundedElastic())
-            .defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
-    }
 }
diff --git a/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java b/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
index d879da72b1..0199487e5a 100644
--- a/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
@@ -22,156 +22,73 @@ package org.apache.james.util;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
 import org.assertj.core.api.Assumptions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 
-public class DataChunkerTest {
-
+class DataChunkerTest {
     public static final int CHUNK_SIZE = 10;
 
-    private DataChunker testee;
+    @Test
+    void chunkShouldThrowOnNullData() {
+        assertThatThrownBy(() -> DataChunker.chunk(null, CHUNK_SIZE))
+            .isInstanceOf(NullPointerException.class);
+    }
 
-    @BeforeEach
-    public void setUp() {
-        testee = new DataChunker();
+    @Test
+    void chunkShouldThrowOnNegativeChunkSize() {
+        int chunkSize = -1;
+        assertThatThrownBy(() -> DataChunker.chunk(new byte[0], chunkSize))
+            .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Nested
-    public class ByteArray {
-
-        @Test
-        public void chunkShouldThrowOnNullData() {
-            assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
-                .isInstanceOf(NullPointerException.class);
-        }
-
-        @Test
-        public void chunkShouldThrowOnNegativeChunkSize() {
-            int chunkSize = -1;
-            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-                .isInstanceOf(IllegalArgumentException.class);
-        }
-
-        @Test
-        public void chunkShouldThrowOnZeroChunkSize() {
-            int chunkSize = 0;
-            assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
-                .isInstanceOf(IllegalArgumentException.class);
-        }
-
-        @Test
-        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
-            Flux<ByteBuffer> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
-            ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
-            assertThat(chunks.toStream()).containsExactly(emptyBuffer);
-        }
-
-        @Test
-        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
-            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
-            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
-            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
-        }
-
-        @Test
-        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
-            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
-            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
-
-            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
-            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
-        }
-
-        @Test
-        public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
-            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
-            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
-            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
-            byte[] data = Bytes.concat(part1, part2);
-
-            Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
-            assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2));
-        }
+    @Test
+    void chunkShouldThrowOnZeroChunkSize() {
+        int chunkSize = 0;
+        assertThatThrownBy(() -> DataChunker.chunk(new byte[0], chunkSize))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
 
+    @Test
+    void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+        Flux<ByteBuffer> chunks = DataChunker.chunk(new byte[0], CHUNK_SIZE);
+        ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+        assertThat(chunks.toStream()).containsExactly(emptyBuffer);
     }
 
-    @Nested
-    public class InputStream {
-
-        @Test
-        public void chunkShouldThrowOnNullData() {
-            assertThatThrownBy(() -> testee.chunkStream(null, CHUNK_SIZE))
-                .isInstanceOf(NullPointerException.class);
-        }
-
-        @Test
-        public void chunkShouldThrowOnNegativeChunkSize() {
-            int chunkSize = -1;
-            assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
-                .isInstanceOf(IllegalArgumentException.class);
-        }
-
-        @Test
-        public void chunkShouldThrowOnZeroChunkSize() {
-            int chunkSize = 0;
-            assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
-                .isInstanceOf(IllegalArgumentException.class);
-        }
-
-        @Test
-        public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
-            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(new byte[0]), CHUNK_SIZE);
-            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(new byte[0]);
-        }
-
-        @Test
-        public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
-            byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
-            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
-            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
-        }
-
-        @Test
-        public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
-            byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
-            Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
-
-            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
-            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
-        }
-
-        @Test
-        public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
-            byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
-            byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
-            Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
-            byte[] data = Bytes.concat(part1, part2);
-
-            Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
-            assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(part1, part2);
-        }
+    @Test
+    void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+        byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+        Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+        assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
     }
 
-    static byte[] read(ByteBuffer buffer) {
-        byte[] arr = new byte[buffer.remaining()];
-        buffer.get(arr);
-        return arr;
+    @Test
+    void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+        byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+        Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+        Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+        assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
     }
 
+    @Test
+    void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+        byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+        byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+        Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+        byte[] data = Bytes.concat(part1, part2);
+
+        Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+        assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org