You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2020/02/07 14:05:23 UTC

[james-project] 01/09: JAMES-3028 Define a DumbBlobStore interface and a read/save Contract testsuite

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b0f44ad0996133092f9d62b1b7d731d9187bd320
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Jan 8 17:45:53 2020 +0100

    JAMES-3028 Define a DumbBlobStore interface and a read/save Contract testsuite
---
 .../java/org/apache/james/blob/api/BlobStore.java  |   8 +-
 .../org/apache/james/blob/api/DumbBlobStore.java   |  95 +++++
 ...{BlobStore.java => IOObjectStoreException.java} |  32 +-
 .../james/blob/api/DumbBlobStoreContract.java      | 388 +++++++++++++++++++++
 .../james/blob/memory/MemoryDumbBlobStore.java     | 102 ++++++
 .../blob/memory/MemoryDumbBlobStoreTest.java}      |  38 +-
 6 files changed, 608 insertions(+), 55 deletions(-)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 59414d5..458b354 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -35,14 +35,14 @@ public interface BlobStore {
 
     Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
 
-    Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
-
-    InputStream read(BucketName bucketName, BlobId blobId);
-
     default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
         return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
     }
 
+    Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+
+    InputStream read(BucketName bucketName, BlobId blobId);
+
     BucketName getDefaultBucketName();
 
     Mono<Void> deleteBucket(BucketName bucketName);
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
new file mode 100644
index 0000000..97dbc11
--- /dev/null
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
@@ -0,0 +1,95 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public interface DumbBlobStore {
+
+    /**
+     * Reads a Blob based on its BucketName and its BlobId.
+     *
+     * @throws ObjectNotFoundException when the blobId or the bucket is not found
+     * @throws IOObjectStoreException when an unexpected IO error occurs
+     */
+    InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException;
+
+
+    /**
+     * Reads a Blob based on its BucketName and its BlobId
+     *
+     * @return a Mono containing the content of the blob or
+     *  an ObjectNotFoundException in its error channel when the blobId or the bucket is not found
+     *  or an IOObjectStoreException when an unexpected IO error occurs
+     */
+    Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+
+
+    /**
+     * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists
+     * The bucket is created if it not already exists.
+     * This operation should be atomic and isolated
+     * @return an empty Mono when the save succeed,
+     *  otherwise an IOObjectStoreException in its error channel
+     */
+    Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data);
+
+    /**
+     * @see #save(BucketName, BlobId, byte[])
+     *
+     * The InputStream should be closed after the call to this method
+     */
+    Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream);
+
+    /**
+     * @see #save(BucketName, BlobId, byte[])
+     */
+    Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content);
+
+    /**
+     * @see #save(BucketName, BlobId, byte[])
+     *
+     * The String is stored as UTF-8.
+     */
+    default Mono<Void> save(BucketName bucketName, BlobId blobId, String data) {
+        return save(bucketName, blobId, data.getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Remove a Blob based on its BucketName and its BlobId.
+     *
+     * @return a successful Mono if the Blob is deleted or did not exist
+     *  otherwise an IOObjectStoreException in its error channel
+     */
+    Mono<Void> delete(BucketName bucketName, BlobId blobId);
+
+    /**
+     * Remove a bucket based on its BucketName
+     *
+     * @return a successful Mono if the bucket is deleted or did not exist
+     *  otherwise an IOObjectStoreException in its error channel
+     */
+    Mono<Void> deleteBucket(BucketName bucketName);
+}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
similarity index 58%
copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
copy to server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
index 59414d5..b36d138 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java
@@ -16,36 +16,16 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
-package org.apache.james.blob.api;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
 
-import reactor.core.publisher.Mono;
+package org.apache.james.blob.api;
 
-public interface BlobStore {
+public class IOObjectStoreException extends ObjectStoreException {
 
-    enum StoragePolicy {
-        SIZE_BASED,
-        LOW_COST,
-        HIGH_PERFORMANCE
+    public IOObjectStoreException(String message) {
+        super(message);
     }
 
-    Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
-
-    Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
-
-    Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
-
-    InputStream read(BucketName bucketName, BlobId blobId);
-
-    default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
-        return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
+    public IOObjectStoreException(String message, Throwable cause) {
+        super(message, cause);
     }
-
-    BucketName getDefaultBucketName();
-
-    Mono<Void> deleteBucket(BucketName bucketName);
-
-    Mono<Void> delete(BucketName bucketName, BlobId blobId);
 }
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
new file mode 100644
index 0000000..844ac69
--- /dev/null
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
@@ -0,0 +1,388 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteSource;
+import reactor.core.publisher.Mono;
+
+public interface DumbBlobStoreContract {
+
+    BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket");
+    BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id");
+    String SHORT_STRING = "toto";
+    byte[] EMPTY_BYTEARRAY = {};
+    byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
+    byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8);
+    byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8);
+
+    DumbBlobStore testee();
+
+    @Test
+    default void saveShouldThrowWhenNullData() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void saveShouldThrowWhenNullString() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+
+    @Test
+    default void saveShouldThrowWhenNullInputStream() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void saveShouldSaveEmptyData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block();
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEmpty();
+    }
+
+    @Test
+    default void saveShouldSaveEmptyString() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
+    }
+
+    @Test
+    default void saveShouldSaveEmptyInputStream() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEmpty();
+    }
+
+    @Test
+    default void saveShouldSaveEmptyByteSource() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEmpty();
+    }
+
+    @Test
+    default void readBytesShouldThrowWhenNotExisting() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown")).block())
+            .isExactlyInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void readBytesShouldReturnSavedData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
+    }
+
+    @Test
+    default void readBytesShouldReturnLongSavedData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
+    }
+
+    @Test
+    default void readBytesShouldReturnBigSavedData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
+    }
+
+    @Test
+    default void readShouldThrowWhenNotExistingStream() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown")))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void saveShouldCreateBucket() {
+        DumbBlobStore store = testee();
+        BucketName nonExisting = BucketName.of("non-existing-bucket");
+        store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        //read for a non-existing bucket would throw
+        assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID))
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    default void readShouldReturnSavedData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY));
+    }
+
+    @Test
+    default void readShouldReturnLongSavedData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+
+        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES));
+    }
+
+    @Test
+    default void readShouldReturnBigSavedData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES));
+    }
+
+    @Test
+    default void saveBytesShouldOverwritePreviousData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+    }
+
+    @Test
+    default void saveByteSourceShouldOverwritePreviousData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)).block();
+
+        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+    }
+
+
+    @Test
+    default void saveInputStreamShouldOverwritePreviousData() {
+        DumbBlobStore store = testee();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block();
+
+        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+    }
+
+    @Test
+    default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
+            .onErrorResume(throwable -> Mono.empty())
+            .block();
+
+        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
+    }
+
+    @Test
+    default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+            @Override
+            public InputStream openStream() throws IOException {
+                return getThrowingInputStream();
+            }
+        })
+            .onErrorResume(throwable -> Mono.empty())
+            .block();
+
+        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
+    }
+
+    @Test
+    default void saveByteSourceShouldThrowOnIOException() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+                @Override
+                public InputStream openStream() throws IOException {
+                    return getThrowingInputStream();
+                }
+            })
+            .block())
+        .isInstanceOf(IOObjectStoreException.class);
+    }
+
+    @Test
+    default void saveInputStreamShouldThrowOnIOException() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
+            .block())
+            .isInstanceOf(IOObjectStoreException.class);
+    }
+
+    @Test
+    default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        ConcurrentTestRunner.builder()
+            .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)))
+            .threadCount(10)
+            .operationCount(100)
+            .runSuccessfullyWithin(Duration.ofMinutes(2));
+    }
+
+    @Test
+    default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        ConcurrentTestRunner.builder()
+            .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes))))
+            .threadCount(10)
+            .operationCount(100)
+            .runSuccessfullyWithin(Duration.ofMinutes(2));
+    }
+
+    @Test
+    default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        ConcurrentTestRunner.builder()
+            .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))))
+            .threadCount(10)
+            .operationCount(100)
+            .runSuccessfullyWithin(Duration.ofMinutes(2));
+    }
+
+    default Publisher<Void> getConcurrentOperation(Function<byte[], Mono<Void>> save) {
+        switch (ThreadLocalRandom.current().nextInt(4)) {
+            case 0:
+                return save.apply(SHORT_BYTEARRAY);
+            case 1:
+                return save.apply(ELEVEN_KILOBYTES);
+            case 2:
+                return save.apply(TWELVE_MEGABYTES);
+            default:
+                return checkConcurrentSaveOperation();
+        }
+    }
+
+    default Mono<Void> checkConcurrentSaveOperation() {
+        return Mono
+            .fromCallable(() ->
+                testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+            .flatMap(inputstream -> Mono.fromCallable(() -> IOUtils.toByteArray(inputstream)))
+            .doOnNext(inputStream -> assertThat(inputStream).isIn(
+                SHORT_BYTEARRAY,
+                ELEVEN_KILOBYTES,
+                TWELVE_MEGABYTES
+            ))
+            .then();
+    }
+
+    default FilterInputStream getThrowingInputStream() {
+        return new FilterInputStream(new ByteArrayInputStream(TWELVE_MEGABYTES)) {
+            int failingThreshold = 5;
+            int alreadyRead = 0;
+
+            @Override
+            public int read() throws IOException {
+                if (alreadyRead < failingThreshold) {
+                    alreadyRead++;
+                    return super.read();
+                } else {
+                    throw new IOException("error on read");
+                }
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                int value = read();
+                if (value != -1) {
+                    b[off] = (byte) value;
+                }
+                return value;
+            }
+
+        };
+    }
+
+}
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
new file mode 100644
index 0000000..48e0d78
--- /dev/null
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.memory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.IOObjectStoreException;
+import org.apache.james.blob.api.ObjectNotFoundException;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public class MemoryDumbBlobStore implements DumbBlobStore {
+
+    private final Table<BucketName, BlobId, byte[]> blobs;
+
+    public MemoryDumbBlobStore() {
+        blobs = HashBasedTable.create();
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException {
+        return readBytes(bucketName, blobId)
+            .map(ByteArrayInputStream::new)
+            .block();
+    }
+
+    @Override
+    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return Mono.fromCallable(() -> blobs.get(bucketName, blobId))
+            .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException(String.format("blob '%s' not found in bucket '%s'", blobId.asString(), bucketName.asString()))));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+        return Mono.fromRunnable(() -> {
+            synchronized (blobs) {
+                blobs.put(bucketName, blobId, data);
+            }
+        });
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
+        return Mono.fromCallable(() -> {
+                try {
+                    return IOUtils.toByteArray(inputStream);
+                } catch (IOException e) {
+                    throw new IOObjectStoreException("IOException occured", e);
+                }
+            })
+            .flatMap(bytes -> save(bucketName, blobId, bytes));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
+        return Mono.fromCallable(() -> {
+                try {
+                    return content.read();
+                } catch (IOException e) {
+                    throw new IOObjectStoreException("IOException occured", e);
+                }
+            })
+            .flatMap(bytes -> save(bucketName, blobId, bytes));
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        return null;
+    }
+
+    @Override
+    public Mono<Void> deleteBucket(BucketName bucketName) {
+        return null;
+    }
+}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
similarity index 57%
copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
copy to server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
index 59414d5..eb326f4 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java
@@ -16,36 +16,24 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
-package org.apache.james.blob.api;
 
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
+package org.apache.james.blob.memory;
 
-import reactor.core.publisher.Mono;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.DumbBlobStoreContract;
+import org.junit.jupiter.api.BeforeEach;
 
-public interface BlobStore {
+class MemoryDumbBlobStoreTest implements DumbBlobStoreContract {
 
-    enum StoragePolicy {
-        SIZE_BASED,
-        LOW_COST,
-        HIGH_PERFORMANCE
-    }
-
-    Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
-
-    Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
-
-    Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+    private MemoryDumbBlobStore blobStore;
 
-    InputStream read(BucketName bucketName, BlobId blobId);
-
-    default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
-        return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
+    @BeforeEach
+    void setUp() {
+        blobStore = new MemoryDumbBlobStore();
     }
 
-    BucketName getDefaultBucketName();
-
-    Mono<Void> deleteBucket(BucketName bucketName);
-
-    Mono<Void> delete(BucketName bucketName, BlobId blobId);
+    @Override
+    public DumbBlobStore testee() {
+        return blobStore;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org