You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/19 09:52:33 UTC

[james-project] 02/04: JAMES-3586 Do read with CL ONE in Cassandra Blobstore when optimistic CL is enabled

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ff8f4f8d620b45678d29343f704fd165fde1647f
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue May 18 14:52:56 2021 +0700

    JAMES-3586 Do read with CL ONE in Cassandra Blobstore when optimistic CL is enabled
    
    If the data is not found, it could be an inconsistency and we fall back to the normal CL read.
---
 .../blob/cassandra/CassandraBlobStoreDAO.java      |  34 ++++
 .../james/blob/cassandra/CassandraBucketDAO.java   |  20 +++
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  18 +++
 .../cassandra/CassandraBlobStoreClOneTest.java     | 171 +++++++++++++++++++++
 4 files changed, 243 insertions(+)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index 5c0b069..f22e096 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -166,6 +166,23 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (configuration.isOptimisticConsistencyLevel()) {
+            return readPartClOne(bucketName, blobId, partIndex)
+                .switchIfEmpty(readPartClDefault(bucketName, blobId, partIndex));
+        } else {
+            return readPartClDefault(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<ByteBuffer> readPartClOne(BucketName bucketName, BlobId blobId, Integer partIndex) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.readPartClOne(blobId, partIndex);
+        } else {
+            return bucketDAO.readPartClOne(bucketName, blobId, partIndex);
+        }
+    }
+
+    private Mono<ByteBuffer> readPartClDefault(BucketName bucketName, BlobId blobId, Integer partIndex) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.readPart(blobId, partIndex);
         } else {
@@ -174,6 +191,23 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
     }
 
     private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+        if (configuration.isOptimisticConsistencyLevel()) {
+            return selectRowCountClOne(bucketName, blobId)
+                .switchIfEmpty(selectRowCountClDefault(bucketName, blobId));
+        } else {
+            return selectRowCountClDefault(bucketName, blobId);
+        }
+    }
+
+    private Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
+        if (isDefaultBucket(bucketName)) {
+            return defaultBucketDAO.selectRowCountClOne(blobId);
+        } else {
+            return bucketDAO.selectRowCountClOne(bucketName, blobId);
+        }
+    }
+
+    private Mono<Integer> selectRowCountClDefault(BucketName bucketName, BlobId blobId) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.selectRowCount(blobId);
         } else {
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 0287f37..809d4c3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.driver.core.ConsistencyLevel.ONE;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -145,6 +146,15 @@ public class CassandraBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
+    Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            select.bind()
+                .setString(BUCKET, bucketName.asString())
+                .setString(ID, blobId.asString())
+                .setConsistencyLevel(ONE))
+            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+    }
+
     Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
@@ -154,6 +164,16 @@ public class CassandraBucketDAO {
             .map(this::rowToData);
     }
 
+    Mono<ByteBuffer> readPartClOne(BucketName bucketName, BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            selectPart.bind()
+                .setString(BucketBlobParts.BUCKET, bucketName.asString())
+                .setString(BucketBlobParts.ID, blobId.asString())
+                .setInt(BucketBlobParts.CHUNK_NUMBER, position)
+                .setConsistencyLevel(ONE))
+            .map(this::rowToData);
+    }
+
     Mono<Void> deletePosition(BucketName bucketName, BlobId blobId) {
         return cassandraAsyncExecutor.executeVoid(
             delete.bind()
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index af99a4b..0866491 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.driver.core.ConsistencyLevel.ONE;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -123,6 +124,14 @@ public class CassandraDefaultBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
+    Mono<Integer> selectRowCountClOne(BlobId blobId) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            select.bind()
+                .setString(ID, blobId.asString())
+                .setConsistencyLevel(ONE))
+            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+    }
+
     Mono<ByteBuffer> readPart(BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
@@ -131,6 +140,15 @@ public class CassandraDefaultBucketDAO {
             .map(this::rowToData);
     }
 
+    Mono<ByteBuffer> readPartClOne(BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            selectPart.bind()
+                .setString(DefaultBucketBlobParts.ID, blobId.asString())
+                .setInt(DefaultBucketBlobParts.CHUNK_NUMBER, position)
+                .setConsistencyLevel(ONE))
+            .map(this::rowToData);
+    }
+
     Mono<Void> deletePosition(BlobId blobId) {
         return cassandraAsyncExecutor.executeVoid(
             delete.bind()
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
new file mode 100644
index 0000000..a9da043
--- /dev/null
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.MetricableBlobStore;
+import org.apache.james.blob.api.ObjectStoreException;
+import org.apache.james.server.blob.deduplication.BlobStoreFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.base.Strings;
+
+import reactor.core.publisher.Mono;
+
+class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract {
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobModule.MODULE);
+
+    private BlobStore testee;
+    private CassandraDefaultBucketDAO defaultBucketDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        CassandraBucketDAO bucketDAO = new CassandraBucketDAO(blobIdFactory, cassandra.getConf());
+        defaultBucketDAO = spy(new CassandraDefaultBucketDAO(cassandra.getConf()));
+        CassandraConfiguration cassandraConfiguration = CassandraConfiguration.builder()
+            .blobPartSize(CHUNK_SIZE)
+            .optimisticConsistencyLevel(true)
+            .build();
+        testee = new MetricableBlobStore(
+            metricsTestExtension.getMetricFactory(),
+            BlobStoreFactory.builder()
+                .blobStoreDAO(new CassandraBlobStoreDAO(defaultBucketDAO, bucketDAO, cassandraConfiguration, BucketName.DEFAULT))
+                .blobIdFactory(blobIdFactory)
+                .defaultBucketName()
+                .deduplication());
+    }
+
+    @Override
+    public BlobStore testee() {
+        return testee;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return new HashBlobId.Factory();
+    }
+
+    @Override
+    public CassandraDefaultBucketDAO defaultBucketDAO() {
+        return defaultBucketDAO;
+    }
+
+    @Override
+    @Test
+    public void readShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        when(defaultBucketDAO().readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8))
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Override
+    @Test
+    public void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+        when(defaultBucketDAO().readPart(blobId, 1)).thenReturn(Mono.empty());
+
+        assertThatThrownBy(() -> Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block())
+            .isInstanceOf(ObjectStoreException.class)
+            .hasMessageContaining("Missing blob part for blobId");
+    }
+
+    @Test
+    void readShouldReturnValidResultWhenDataMissingInOneNodeButPresentInOthers() throws IOException {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+
+        String data = IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8);
+
+        assertThat(data).isEqualTo(longString);
+    }
+
+    @Test
+    void readBytesShouldReturnValidResultWhenDataMissingInOneNodeButPresentInOthers() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+
+        byte[] bytes = Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block();
+
+        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
+    }
+
+    @Test
+    void readShouldReturnValidResultWhenPartialDataMissingInOneNodeButPresentInOthers() throws IOException {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+
+        String data = IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8);
+
+        assertThat(data).isEqualTo(longString);
+    }
+
+    @Test
+    void readBytesShouldReturnValidResultWhenPartialDataMissingInOneNodeButPresentInOthers() {
+        int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
+        String longString = Strings.repeat("0123456789\n", repeatCount);
+        BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block();
+
+        when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty());
+
+        byte[] bytes = Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block();
+
+        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org