You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:05 UTC

[james-project] 12/20: JAMES-2838 Root cause for partial reads upon concurrent delete is pipedInputStream

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6d57fd011ee7176450252448f95eaaac9577393f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Jul 28 17:42:54 2019 +0200

    JAMES-2838 Root cause for partial reads upon concurrent delete is pipedInputStream
    
    This model does not allow to propagate exceptions that occurs during the read.
---
 .../james/blob/api/DeleteBlobStoreContract.java    | 31 ++++++++++++++--
 .../james/blob/cassandra/CassandraBlobStore.java   |  6 ++--
 .../blob/cassandra/CassandraBlobStoreTest.java     | 42 ++++++++++++++++++++--
 3 files changed, 71 insertions(+), 8 deletions(-)

diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
index 1548098..35f7df9 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
@@ -158,8 +158,35 @@ public interface DeleteBlobStoreContract {
                 try {
                     InputStream read = store.read(defaultBucketName, blobId);
 
-                    if (!IOUtils.toString(read, StandardCharsets.UTF_8).equals(TWELVE_MEGABYTES_STRING)) {
-                        throw new RuntimeException("Should not read partial blob when an other thread is deleting it");
+                    String string = IOUtils.toString(read, StandardCharsets.UTF_8);
+                    if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+                        throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
+                    }
+                } catch (ObjectStoreException exception) {
+                    // normal behavior here
+                }
+
+                store.delete(defaultBucketName, blobId).block();
+            }))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(3));
+    }
+
+    @Test
+    default void readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception {
+        BlobStore store = testee();
+        BucketName defaultBucketName = store.getDefaultBucketName();
+
+        BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) -> {
+                try {
+                    byte[] read = store.readBytes(defaultBucketName, blobId).block();
+                    String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName());
+                    if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+                        throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
                     }
                 } catch (ObjectStoreException exception) {
                     // normal behavior here
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 831baf6..7163460 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -139,9 +139,9 @@ public class CassandraBlobStore implements BlobStore {
         return Flux.range(0, rowCount)
             .publishOn(Schedulers.elastic(), PREFETCH)
             .flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
-                .switchIfEmpty(Mono.error(new IllegalStateException(
-                    String.format("Missing blob part for blobId %s and position %d", blobId, partIndex))))
-                , MAX_CONCURRENCY, PREFETCH);
+                .switchIfEmpty(Mono.error(new ObjectStoreException(
+                    String.format("Missing blob part for blobId %s and position %d", blobId, partIndex)))),
+                MAX_CONCURRENCY, PREFETCH);
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index baf617c..dcb4dec 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -21,10 +21,13 @@ package org.apache.james.blob.cassandra;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
@@ -35,6 +38,7 @@ import org.apache.james.blob.api.DeleteBlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.MetricableBlobStore;
 import org.apache.james.blob.api.MetricableBlobStoreContract;
+import org.apache.james.blob.api.ObjectStoreException;
 import org.apache.james.util.ZeroedInputStream;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -43,6 +47,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.base.Strings;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraBlobStoreTest implements MetricableBlobStoreContract, BucketBlobStoreContract, DeleteBlobStoreContract {
     private static final int CHUNK_SIZE = 10240;
     private static final int MULTIPLE_CHUNK_SIZE = 3;
@@ -51,14 +57,17 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE);
 
     private BlobStore testee;
+    private CassandraDefaultBucketDAO defaultBucketDAO;
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
         HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
+        defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf()));
         testee = new MetricableBlobStore(
             metricsTestExtension.getMetricFactory(),
-            new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
-                new CassandraBucketDAO(blobIdFactory, cassandra.getConf()),
+            new CassandraBlobStore(defaultBucketDAO,
+                bucketDAO,
                 CassandraConfiguration.builder()
                     .blobPartSize(CHUNK_SIZE)
                     .build(),
@@ -76,7 +85,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
     }
 
     @Override
-    @Disabled("Concurrent read and delete can lead to partial reads (no transactions)")
+    @Disabled("JAMES-2838 Read inputStream relies on a pipedInputStream model that does not allow to propagate partial read exceptions")
     public void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() {
 
     }
@@ -92,6 +101,33 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
     }
 
     @Test
+    void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block();
+
+        when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> testee.readBytes(testee.getDefaultBucketName(), blobId).block())
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Disabled("JAMES-2838 Read inputStream relies on a pipedInputStream model that does not allow to propagate partial read exceptions")
+    @Test
+    void readShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block();
+
+        when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> IOUtils.toString(testee.read(testee.getDefaultBucketName(), blobId), StandardCharsets.UTF_8))
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Test
     void deleteBucketShouldThrowWhenDeletingDefaultBucket() {
         assertThatThrownBy(() ->  testee.deleteBucket(testee.getDefaultBucketName()))
             .isInstanceOf(IllegalArgumentException.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org