You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/02/07 14:05:23 UTC
[james-project] 01/09: JAMES-3028 Define a DumbBlobStore interface
and a read/save Contract testsuite
This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b0f44ad0996133092f9d62b1b7d731d9187bd320
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Jan 8 17:45:53 2020 +0100
JAMES-3028 Define a DumbBlobStore interface and a read/save Contract testsuite
---
.../java/org/apache/james/blob/api/BlobStore.java | 8 +-
.../org/apache/james/blob/api/DumbBlobStore.java | 95 +++++
...{BlobStore.java => IOObjectStoreException.java} | 32 +-
.../james/blob/api/DumbBlobStoreContract.java | 388 +++++++++++++++++++++
.../james/blob/memory/MemoryDumbBlobStore.java | 102 ++++++
.../blob/memory/MemoryDumbBlobStoreTest.java} | 38 +-
6 files changed, 608 insertions(+), 55 deletions(-)
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 59414d5..458b354 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -35,14 +35,14 @@ public interface BlobStore {
Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
- Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
-
- InputStream read(BucketName bucketName, BlobId blobId);
-
default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
}
+ Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+
+ InputStream read(BucketName bucketName, BlobId blobId);
+
BucketName getDefaultBucketName();
Mono<Void> deleteBucket(BucketName bucketName);
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
new file mode 100644
index 0000000..97dbc11
--- /dev/null
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
@@ -0,0 +1,95 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public interface DumbBlobStore {
+
+ /**
+ * Reads a Blob based on its BucketName and its BlobId.
+ *
+ * @throws ObjectNotFoundException when the blobId or the bucket is not found
+ * @throws IOObjectStoreException when an unexpected IO error occurs
+ */
+ InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException;
+
+
+ /**
+ * Reads a Blob based on its BucketName and its BlobId
+ *
+ * @return a Mono containing the content of the blob or
+ * an ObjectNotFoundException in its error channel when the blobId or the bucket is not found
+ * or an IOObjectStoreException when an unexpected IO error occurs
+ */
+ Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+
+
+ /**
+ * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists
+ * The bucket is created if it not already exists.
+ * This operation should be atomic and isolated
+ * @return an empty Mono when the save succeed,
+ * otherwise an IOObjectStoreException in its error channel
+ */
+ Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data);
+
+ /**
+ * @see #save(BucketName, BlobId, byte[])
+ *
+ * The InputStream should be closed after the call to this method
+ */
+ Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream);
+
+ /**
+ * @see #save(BucketName, BlobId, byte[])
+ */
+ Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content);
+
+ /**
+ * @see #save(BucketName, BlobId, byte[])
+ *
+ * The String is stored as UTF-8.
+ */
+ default Mono<Void> save(BucketName bucketName, BlobId blobId, String data) {
+ return save(bucketName, blobId, data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Remove a Blob based on its BucketName and its BlobId.
+ *
+ * @return a successful Mono if the Blob is deleted or did not exist
+ * otherwise an IOObjectStoreException in its error channel
+ */
+ Mono<Void> delete(BucketName bucketName, BlobId blobId);
+
+ /**
+ * Remove a bucket based on its BucketName
+ *
+ * @return a successful Mono if the bucket is deleted or did not exist
+ * otherwise an IOObjectStoreException in its error channel
+ */
+ Mono<Void> deleteBucket(BucketName bucketName);
+}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
similarity index 58%
copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
copy to server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
index 59414d5..b36d138 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
@@ -16,36 +16,16 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-package org.apache.james.blob.api;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import reactor.core.publisher.Mono;
+package org.apache.james.blob.api;
-public interface BlobStore {
+public class IOObjectStoreException extends ObjectStoreException {
- enum StoragePolicy {
- SIZE_BASED,
- LOW_COST,
- HIGH_PERFORMANCE
+ public IOObjectStoreException(String message) {
+ super(message);
}
- Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
-
- Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
-
- Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
-
- InputStream read(BucketName bucketName, BlobId blobId);
-
- default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
- return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
+ public IOObjectStoreException(String message, Throwable cause) {
+ super(message, cause);
}
-
- BucketName getDefaultBucketName();
-
- Mono<Void> deleteBucket(BucketName bucketName);
-
- Mono<Void> delete(BucketName bucketName, BlobId blobId);
}
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
new file mode 100644
index 0000000..844ac69
--- /dev/null
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
@@ -0,0 +1,388 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteSource;
+import reactor.core.publisher.Mono;
+
+public interface DumbBlobStoreContract {
+
+ BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket");
+ BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id");
+ String SHORT_STRING = "toto";
+ byte[] EMPTY_BYTEARRAY = {};
+ byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
+ byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8);
+ byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8);
+
+ DumbBlobStore testee();
+
+ @Test
+ default void saveShouldThrowWhenNullData() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null).block())
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ default void saveShouldThrowWhenNullString() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null).block())
+ .isInstanceOf(NullPointerException.class);
+ }
+
+
+ @Test
+ default void saveShouldThrowWhenNullInputStream() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null).block())
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ default void saveShouldSaveEmptyData() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block();
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEmpty();
+ }
+
+ @Test
+ default void saveShouldSaveEmptyString() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
+ }
+
+ @Test
+ default void saveShouldSaveEmptyInputStream() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEmpty();
+ }
+
+ @Test
+ default void saveShouldSaveEmptyByteSource() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEmpty();
+ }
+
+ @Test
+ default void readBytesShouldThrowWhenNotExisting() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown")).block())
+ .isExactlyInstanceOf(ObjectNotFoundException.class);
+ }
+
+ @Test
+ default void readBytesShouldReturnSavedData() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
+ }
+
+ @Test
+ default void readBytesShouldReturnLongSavedData() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
+ }
+
+ @Test
+ default void readBytesShouldReturnBigSavedData() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+ byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
+ }
+
+ @Test
+ default void readShouldThrowWhenNotExistingStream() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")))
+ .isInstanceOf(ObjectNotFoundException.class);
+ }
+
+ @Test
+ default void saveShouldCreateBucket() {
+ DumbBlobStore store = testee();
+ BucketName nonExisting = BucketName.of("non-existing-bucket");
+ store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+ //read for a non-existing bucket would throw
+ assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ default void readShouldReturnSavedData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+ InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY));
+ }
+
+ @Test
+ default void readShouldReturnLongSavedData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+
+ InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES));
+ }
+
+ @Test
+ default void readShouldReturnBigSavedData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+ InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+ assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+ }
+
+ @Test
+ default void saveBytesShouldOverwritePreviousData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+ byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+ }
+
+ @Test
+ default void saveByteSourceShouldOverwritePreviousData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)).block();
+
+ byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+ }
+
+
+ @Test
+ default void saveInputStreamShouldOverwritePreviousData() {
+ DumbBlobStore store = testee();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
+
+ byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+ }
+
+ @Test
+ default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
+ .onErrorResume(throwable -> Mono.empty())
+ .block();
+
+ byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
+ }
+
+ @Test
+ default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() {
+ DumbBlobStore store = testee();
+
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+ store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ return getThrowingInputStream();
+ }
+ })
+ .onErrorResume(throwable -> Mono.empty())
+ .block();
+
+ byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+ assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
+ }
+
+ @Test
+ default void saveByteSourceShouldThrowOnIOException() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ return getThrowingInputStream();
+ }
+ })
+ .block())
+ .isInstanceOf(IOObjectStoreException.class);
+ }
+
+ @Test
+ default void saveInputStreamShouldThrowOnIOException() {
+ DumbBlobStore store = testee();
+
+ assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
+ .block())
+ .isInstanceOf(IOObjectStoreException.class);
+ }
+
+ @Test
+ default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+ testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ ConcurrentTestRunner.builder()
+ .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)))
+ .threadCount(10)
+ .operationCount(100)
+ .runSuccessfullyWithin(Duration.ofMinutes(2));
+ }
+
+ @Test
+ default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+ testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ ConcurrentTestRunner.builder()
+ .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes))))
+ .threadCount(10)
+ .operationCount(100)
+ .runSuccessfullyWithin(Duration.ofMinutes(2));
+ }
+
+ @Test
+ default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+ testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ ConcurrentTestRunner.builder()
+ .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))))
+ .threadCount(10)
+ .operationCount(100)
+ .runSuccessfullyWithin(Duration.ofMinutes(2));
+ }
+
+ default Publisher<Void> getConcurrentOperation(Function<byte[], Mono<Void>> save) {
+ switch (ThreadLocalRandom.current().nextInt(4)) {
+ case 0:
+ return save.apply(SHORT_BYTEARRAY);
+ case 1:
+ return save.apply(ELEVEN_KILOBYTES);
+ case 2:
+ return save.apply(TWELVE_MEGABYTES);
+ default:
+ return checkConcurrentSaveOperation();
+ }
+ }
+
+ default Mono<Void> checkConcurrentSaveOperation() {
+ return Mono
+ .fromCallable(() ->
+ testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+ .flatMap(inputstream -> Mono.fromCallable(() -> IOUtils.toByteArray(inputstream)))
+ .doOnNext(inputStream -> assertThat(inputStream).isIn(
+ SHORT_BYTEARRAY,
+ ELEVEN_KILOBYTES,
+ TWELVE_MEGABYTES
+ ))
+ .then();
+ }
+
+ default FilterInputStream getThrowingInputStream() {
+ return new FilterInputStream(new ByteArrayInputStream(TWELVE_MEGABYTES)) {
+ int failingThreshold = 5;
+ int alreadyRead = 0;
+
+ @Override
+ public int read() throws IOException {
+ if (alreadyRead < failingThreshold) {
+ alreadyRead++;
+ return super.read();
+ } else {
+ throw new IOException("error on read");
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int value = read();
+ if (value != -1) {
+ b[off] = (byte) value;
+ }
+ return value;
+ }
+
+ };
+ }
+
+}
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
new file mode 100644
index 0000000..48e0d78
--- /dev/null
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.memory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.IOObjectStoreException;
+import org.apache.james.blob.api.ObjectNotFoundException;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryDumbBlobStore implements DumbBlobStore {
+
+ private final Table<BucketName, BlobId, byte[]> blobs;
+
+ public MemoryDumbBlobStore() {
+ blobs = HashBasedTable.create();
+ }
+
+ @Override
+ public InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException {
+ return readBytes(bucketName, blobId)
+ .map(ByteArrayInputStream::new)
+ .block();
+ }
+
+ @Override
+ public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+ return Mono.fromCallable(() -> blobs.get(bucketName, blobId))
+ .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException(String.format("blob '%s' not found in bucket '%s'", blobId.asString(), bucketName.asString()))));
+ }
+
+ @Override
+ public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+ return Mono.fromRunnable(() -> {
+ synchronized (blobs) {
+ blobs.put(bucketName, blobId, data);
+ }
+ });
+ }
+
+ @Override
+ public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
+ return Mono.fromCallable(() -> {
+ try {
+ return IOUtils.toByteArray(inputStream);
+ } catch (IOException e) {
+ throw new IOObjectStoreException("IOException occured", e);
+ }
+ })
+ .flatMap(bytes -> save(bucketName, blobId, bytes));
+ }
+
+ @Override
+ public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
+ return Mono.fromCallable(() -> {
+ try {
+ return content.read();
+ } catch (IOException e) {
+ throw new IOObjectStoreException("IOException occured", e);
+ }
+ })
+ .flatMap(bytes -> save(bucketName, blobId, bytes));
+ }
+
+ @Override
+ public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+ return null;
+ }
+
+ @Override
+ public Mono<Void> deleteBucket(BucketName bucketName) {
+ return null;
+ }
+}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
similarity index 57%
copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
copy to server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
index 59414d5..eb326f4 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
@@ -16,36 +16,24 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-package org.apache.james.blob.api;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
+package org.apache.james.blob.memory;
-import reactor.core.publisher.Mono;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.DumbBlobStoreContract;
+import org.junit.jupiter.api.BeforeEach;
-public interface BlobStore {
+class MemoryDumbBlobStoreTest implements DumbBlobStoreContract {
- enum StoragePolicy {
- SIZE_BASED,
- LOW_COST,
- HIGH_PERFORMANCE
- }
-
- Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
-
- Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
-
- Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+ private MemoryDumbBlobStore blobStore;
- InputStream read(BucketName bucketName, BlobId blobId);
-
- default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
- return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
+ @BeforeEach
+ void setUp() {
+ blobStore = new MemoryDumbBlobStore();
}
- BucketName getDefaultBucketName();
-
- Mono<Void> deleteBucket(BucketName bucketName);
-
- Mono<Void> delete(BucketName bucketName, BlobId blobId);
+ @Override
+ public DumbBlobStore testee() {
+ return blobStore;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org