You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:05 UTC
[james-project] 12/20: JAMES-2838 Root cause for partial reads upon
concurrent delete is pipedInputStream
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6d57fd011ee7176450252448f95eaaac9577393f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Jul 28 17:42:54 2019 +0200
JAMES-2838 Root cause for partial reads upon concurrent delete is pipedInputStream
This model does not allow to propagate exceptions that occurs during the read.
---
.../james/blob/api/DeleteBlobStoreContract.java | 31 ++++++++++++++--
.../james/blob/cassandra/CassandraBlobStore.java | 6 ++--
.../blob/cassandra/CassandraBlobStoreTest.java | 42 ++++++++++++++++++++--
3 files changed, 71 insertions(+), 8 deletions(-)
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
index 1548098..35f7df9 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
@@ -158,8 +158,35 @@ public interface DeleteBlobStoreContract {
try {
InputStream read = store.read(defaultBucketName, blobId);
- if (!IOUtils.toString(read, StandardCharsets.UTF_8).equals(TWELVE_MEGABYTES_STRING)) {
- throw new RuntimeException("Should not read partial blob when an other thread is deleting it");
+ String string = IOUtils.toString(read, StandardCharsets.UTF_8);
+ if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+ throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
+ }
+ } catch (ObjectStoreException exception) {
+ // normal behavior here
+ }
+
+ store.delete(defaultBucketName, blobId).block();
+ }))
+ .threadCount(10)
+ .operationCount(10)
+ .runSuccessfullyWithin(Duration.ofMinutes(3));
+ }
+
+ @Test
+ default void readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception {
+ BlobStore store = testee();
+ BucketName defaultBucketName = store.getDefaultBucketName();
+
+ BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES).block();
+
+ ConcurrentTestRunner.builder()
+ .operation(((threadNumber, step) -> {
+ try {
+ byte[] read = store.readBytes(defaultBucketName, blobId).block();
+ String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName());
+ if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+ throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
}
} catch (ObjectStoreException exception) {
// normal behavior here
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 831baf6..7163460 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -139,9 +139,9 @@ public class CassandraBlobStore implements BlobStore {
return Flux.range(0, rowCount)
.publishOn(Schedulers.elastic(), PREFETCH)
.flatMapSequential(partIndex -> readPart(bucketName, blobId, partIndex)
- .switchIfEmpty(Mono.error(new IllegalStateException(
- String.format("Missing blob part for blobId %s and position %d", blobId, partIndex))))
- , MAX_CONCURRENCY, PREFETCH);
+ .switchIfEmpty(Mono.error(new ObjectStoreException(
+ String.format("Missing blob part for blobId %s and position %d", blobId, partIndex)))),
+ MAX_CONCURRENCY, PREFETCH);
}
@Override
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index baf617c..dcb4dec 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -21,10 +21,13 @@ package org.apache.james.blob.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
@@ -35,6 +38,7 @@ import org.apache.james.blob.api.DeleteBlobStoreContract;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.MetricableBlobStore;
import org.apache.james.blob.api.MetricableBlobStoreContract;
+import org.apache.james.blob.api.ObjectStoreException;
import org.apache.james.util.ZeroedInputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -43,6 +47,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.common.base.Strings;
+import reactor.core.publisher.Mono;
+
public class CassandraBlobStoreTest implements MetricableBlobStoreContract, BucketBlobStoreContract, DeleteBlobStoreContract {
private static final int CHUNK_SIZE = 10240;
private static final int MULTIPLE_CHUNK_SIZE = 3;
@@ -51,14 +57,17 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE);
private BlobStore testee;
+ private CassandraDefaultBucketDAO defaultBucketDAO;
@BeforeEach
void setUp(CassandraCluster cassandra) {
HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+ CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
+ defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf()));
testee = new MetricableBlobStore(
metricsTestExtension.getMetricFactory(),
- new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
- new CassandraBucketDAO(blobIdFactory, cassandra.getConf()),
+ new CassandraBlobStore(defaultBucketDAO,
+ bucketDAO,
CassandraConfiguration.builder()
.blobPartSize(CHUNK_SIZE)
.build(),
@@ -76,7 +85,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
}
@Override
- @Disabled("Concurrent read and delete can lead to partial reads (no transactions)")
+ @Disabled("JAMES-2838 Read inputStream relies on a pipedInputStream model that does not allow to propagate partial read exceptions")
public void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() {
}
@@ -92,6 +101,33 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
}
@Test
+ void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+ int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+ String longString = Strings.repeat("0123456789\n", repeatCount);
+ BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block();
+
+ when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
+
+ assertThatThrownBy(() -> testee.readBytes(testee.getDefaultBucketName(), blobId).block())
+ .isInstanceOf(ObjectStoreException.class)
+ .hasMessageContaining("Missing blob part for blobId");
+ }
+
+ @Disabled("JAMES-2838 Read inputStream relies on a pipedInputStream model that does not allow to propagate partial read exceptions")
+ @Test
+ void readShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+ int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+ String longString = Strings.repeat("0123456789\n", repeatCount);
+ BlobId blobId = testee.save(testee.getDefaultBucketName(), longString).block();
+
+ when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
+
+ assertThatThrownBy(() -> IOUtils.toString(testee.read(testee.getDefaultBucketName(), blobId), StandardCharsets.UTF_8))
+ .isInstanceOf(ObjectStoreException.class)
+ .hasMessageContaining("Missing blob part for blobId");
+ }
+
+ @Test
void deleteBucketShouldThrowWhenDeletingDefaultBucket() {
assertThatThrownBy(() -> testee.deleteBucket(testee.getDefaultBucketName()))
.isInstanceOf(IllegalArgumentException.class)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org