You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/14 08:49:49 UTC
[james-project] branch master updated: [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 47825dcad1 [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
47825dcad1 is described below
commit 47825dcad138e28552ad7f1ca2bb45e12766c0cd
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri Apr 14 15:49:42 2023 +0700
[REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
---
.../blob/cassandra/CassandraBlobStoreDAO.java | 4 +-
.../blob/objectstorage/aws/S3BlobStoreDAO.java | 4 +-
.../java/org/apache/james/util/DataChunker.java | 27 ----
.../org/apache/james/util/DataChunkerTest.java | 177 ++++++---------------
4 files changed, 52 insertions(+), 160 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index 9e2d72b7be..d73e68d166 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -54,6 +54,7 @@ import com.google.common.io.ByteSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
/**
* WARNING: JAMES-3591 Cassandra is not made to store large binary content, its use will be suboptimal compared to
@@ -125,7 +126,8 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(inputStream);
- return Mono.fromCallable(() -> DataChunker.chunkStream(inputStream, configuration.getBlobPartSize()))
+ return Mono.fromCallable(() -> ReactorUtils.toChunks(inputStream, configuration.getBlobPartSize())
+ .subscribeOn(Schedulers.boundedElastic()))
.flatMap(chunks -> save(bucketName, blobId, chunks))
.onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e));
}
diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index ba85e2375e..725cf8dcd8 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -44,7 +44,6 @@ import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.lifecycle.api.Startable;
-import org.apache.james.util.DataChunker;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
@@ -330,7 +329,8 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
if (chunkSize == 0) {
return Flux.empty();
}
- return DataChunker.chunkStream(stream, chunkSize);
+ return ReactorUtils.toChunks(stream, chunkSize)
+ .subscribeOn(Schedulers.boundedElastic());
}
private RetryBackoffSpec createBucketOnRetry(BucketName bucketName) {
diff --git a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
index 0db274d32f..9677cdf4f2 100644
--- a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
+++ b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
@@ -19,17 +19,12 @@
package org.apache.james.util;
-import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
-import org.apache.james.util.io.UnsynchronizedBufferedInputStream;
-
import com.google.common.base.Preconditions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
public class DataChunker {
@@ -54,26 +49,4 @@ public class DataChunker {
}
return Mono.just(ByteBuffer.wrap(data, offset, data.length - offset));
}
-
- public static Flux<ByteBuffer> chunkStream(InputStream data, int chunkSize) {
- Preconditions.checkNotNull(data);
- Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
- UnsynchronizedBufferedInputStream bufferedInputStream = new UnsynchronizedBufferedInputStream(data);
- return Flux.<ByteBuffer>generate(sink -> {
- try {
- byte[] buffer = new byte[chunkSize];
-
- int size = bufferedInputStream.read(buffer);
- if (size <= 0) {
- sink.complete();
- } else {
- sink.next(ByteBuffer.wrap(buffer, 0, size));
- }
- } catch (IOException e) {
- sink.error(e);
- }
- })
- .subscribeOn(Schedulers.boundedElastic())
- .defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
- }
}
diff --git a/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java b/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
index d879da72b1..0199487e5a 100644
--- a/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/DataChunkerTest.java
@@ -22,156 +22,73 @@ package org.apache.james.util;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assumptions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import com.google.common.primitives.Bytes;
import reactor.core.publisher.Flux;
-public class DataChunkerTest {
-
+class DataChunkerTest {
public static final int CHUNK_SIZE = 10;
- private DataChunker testee;
+ @Test
+ void chunkShouldThrowOnNullData() {
+ assertThatThrownBy(() -> DataChunker.chunk(null, CHUNK_SIZE))
+ .isInstanceOf(NullPointerException.class);
+ }
- @BeforeEach
- public void setUp() {
- testee = new DataChunker();
+ @Test
+ void chunkShouldThrowOnNegativeChunkSize() {
+ int chunkSize = -1;
+ assertThatThrownBy(() -> DataChunker.chunk(new byte[0], chunkSize))
+ .isInstanceOf(IllegalArgumentException.class);
}
- @Nested
- public class ByteArray {
-
- @Test
- public void chunkShouldThrowOnNullData() {
- assertThatThrownBy(() -> testee.chunk(null, CHUNK_SIZE))
- .isInstanceOf(NullPointerException.class);
- }
-
- @Test
- public void chunkShouldThrowOnNegativeChunkSize() {
- int chunkSize = -1;
- assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void chunkShouldThrowOnZeroChunkSize() {
- int chunkSize = 0;
- assertThatThrownBy(() -> testee.chunk(new byte[0], chunkSize))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
- Flux<ByteBuffer> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
- ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
- assertThat(chunks.toStream()).containsExactly(emptyBuffer);
- }
-
- @Test
- public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
- byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
- Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
- assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
- }
-
- @Test
- public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
- byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
- Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
-
- Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
- assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
- }
-
- @Test
- public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
- byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
- byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
- Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
- byte[] data = Bytes.concat(part1, part2);
-
- Flux<ByteBuffer> chunks = testee.chunk(data, CHUNK_SIZE);
-
- assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2));
- }
+ @Test
+ void chunkShouldThrowOnZeroChunkSize() {
+ int chunkSize = 0;
+ assertThatThrownBy(() -> DataChunker.chunk(new byte[0], chunkSize))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+ @Test
+ void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+ Flux<ByteBuffer> chunks = DataChunker.chunk(new byte[0], CHUNK_SIZE);
+ ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
+ assertThat(chunks.toStream()).containsExactly(emptyBuffer);
}
- @Nested
- public class InputStream {
-
- @Test
- public void chunkShouldThrowOnNullData() {
- assertThatThrownBy(() -> testee.chunkStream(null, CHUNK_SIZE))
- .isInstanceOf(NullPointerException.class);
- }
-
- @Test
- public void chunkShouldThrowOnNegativeChunkSize() {
- int chunkSize = -1;
- assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void chunkShouldThrowOnZeroChunkSize() {
- int chunkSize = 0;
- assertThatThrownBy(() -> testee.chunkStream(new ByteArrayInputStream(new byte[0]), chunkSize))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
- Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(new byte[0]), CHUNK_SIZE);
- assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(new byte[0]);
- }
-
- @Test
- public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
- byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
- Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
- assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
- }
-
- @Test
- public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
- byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
- Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
-
- Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
- assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(data);
- }
-
- @Test
- public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
- byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
- byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
- Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
- byte[] data = Bytes.concat(part1, part2);
-
- Flux<ByteBuffer> chunks = testee.chunkStream(new ByteArrayInputStream(data), CHUNK_SIZE);
-
- assertThat(chunks.map(DataChunkerTest::read).toStream()).containsExactly(part1, part2);
- }
+ @Test
+ void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+ byte[] data = "12345".getBytes(StandardCharsets.UTF_8);
+ Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+ assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
}
- static byte[] read(ByteBuffer buffer) {
- byte[] arr = new byte[buffer.remaining()];
- buffer.get(arr);
- return arr;
+ @Test
+ void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+ byte[] data = "1234567890".getBytes(StandardCharsets.UTF_8);
+ Assumptions.assumeThat(data.length).isEqualTo(CHUNK_SIZE);
+
+ Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+ assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(data));
}
+ @Test
+ void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+ byte[] part1 = "1234567890".getBytes(StandardCharsets.UTF_8);
+ byte[] part2 = "12345".getBytes(StandardCharsets.UTF_8);
+ Assumptions.assumeThat(part1.length).isEqualTo(CHUNK_SIZE);
+ byte[] data = Bytes.concat(part1, part2);
+
+ Flux<ByteBuffer> chunks = DataChunker.chunk(data, CHUNK_SIZE);
+
+ assertThat(chunks.toStream()).containsExactly(ByteBuffer.wrap(part1), ByteBuffer.wrap(part2));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org