You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:03 UTC

[james-project] 10/20: JAMES-2838 CassandraBlobStore deleteBucket implementation

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 920e107c5b2639d19efac3503b8deba8ffc05b9f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 25 11:50:57 2019 +0200

    JAMES-2838 CassandraBlobStore deleteBucket implementation
    
    Due to the primary key schema we can not get in a single request all of the blobs part of a bucket.
    
    Thus, we need to scroll all blobs in order to filter out those not part of the deleted bucket.
    
    Note that large rows issues disallow the trivial (PK: bucketName, CK: blobId) design.
    
    A complex timeserie could be imagined in order to get all blobs of a bucket, but iteration over the bucket will still be needed (part of the primary key). Thus, deleting a bucket in the CassandraBlobStore is a complex operation.
---
 .../james/blob/cassandra/CassandraBlobStore.java   | 11 +++++--
 .../james/blob/cassandra/CassandraBucketDAO.java   | 20 +++++++++++--
 .../blob/cassandra/CassandraBlobStoreTest.java     | 35 ++--------------------
 .../blob/cassandra/CassandraBucketDAOTest.java     | 27 ++++++++++++++---
 4 files changed, 52 insertions(+), 41 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index e9fa212..cfdc440 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -29,7 +29,6 @@ import java.util.stream.Stream;
 import javax.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobId;
@@ -71,7 +70,7 @@ public class CassandraBlobStore implements BlobStore {
     @VisibleForTesting
     public CassandraBlobStore(Session session) {
         this(new CassandraDefaultBucketDAO(session),
-            new CassandraBucketDAO(session),
+            new CassandraBucketDAO(new HashBlobId.Factory(), session),
             CassandraConfiguration.DEFAULT_CONFIGURATION,
             new HashBlobId.Factory());
     }
@@ -154,7 +153,13 @@ public class CassandraBlobStore implements BlobStore {
 
     @Override
     public Mono<Void> deleteBucket(BucketName bucketName) {
-        throw new NotImplementedException("not implemented");
+        Preconditions.checkNotNull(bucketName);
+
+        return bucketDAO.listAll()
+            .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName))
+            .map(Pair::getValue)
+            .flatMap(blobId -> delete(bucketName, blobId))
+            .then();
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 713429f..0c6771a 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BucketName;
@@ -42,9 +43,11 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-public class CassandraBucketDAO {
+class CassandraBucketDAO {
+    private final BlobId.Factory blobIdFactory;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
     private final PreparedStatement insertPart;
@@ -52,10 +55,12 @@ public class CassandraBucketDAO {
     private final PreparedStatement selectPart;
     private final PreparedStatement delete;
     private final PreparedStatement deleteParts;
+    private final PreparedStatement listAll;
 
     @Inject
     @VisibleForTesting
-    public CassandraBucketDAO(Session session) {
+    CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
+        this.blobIdFactory = blobIdFactory;
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insert = prepareInsert(session);
         this.select = prepareSelect(session);
@@ -63,6 +68,7 @@ public class CassandraBucketDAO {
         this.insertPart = prepareInsertPart(session);
         this.selectPart = prepareSelectPart(session);
         this.deleteParts = prepareDeleteParts(session);
+        this.listAll = prepareListAll(session);
     }
 
     private PreparedStatement prepareDeleteParts(Session session) {
@@ -79,6 +85,11 @@ public class CassandraBucketDAO {
                 .and(eq(ID, bindMarker(ID))));
     }
 
+    private PreparedStatement prepareListAll(Session session) {
+        return session.prepare(select()
+            .from(BlobTables.BucketBlobTable.TABLE_NAME));
+    }
+
     private PreparedStatement prepareSelect(Session session) {
         return session.prepare(select()
             .from(BlobTables.BucketBlobTable.TABLE_NAME)
@@ -157,6 +168,11 @@ public class CassandraBucketDAO {
                 .setString(BucketBlobParts.ID, blobId.asString()));
     }
 
+    Flux<Pair<BucketName, BlobId>> listAll() {
+        return cassandraAsyncExecutor.executeRows(listAll.bind())
+            .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
+    }
+
     private byte[] rowToData(Row row) {
         byte[] data = new byte[row.getBytes(BucketBlobParts.DATA).remaining()];
         row.getBytes(BucketBlobParts.DATA).get(data);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 3014a60..c1c6d8b 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -53,14 +53,15 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
         testee = new MetricableBlobStore(
             metricsTestExtension.getMetricFactory(),
             new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
-                new CassandraBucketDAO(cassandra.getConf()),
+                new CassandraBucketDAO(blobIdFactory, cassandra.getConf()),
                 CassandraConfiguration.builder()
                     .blobPartSize(CHUNK_SIZE)
                     .build(),
-                new HashBlobId.Factory()));
+                blobIdFactory));
     }
 
     @Override
@@ -74,36 +75,6 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
     }
 
     @Override
-    @Disabled("JAMES-2806: delete bucket not implemented yet for Cassandra")
-    public void deleteBucketShouldPublishDeleteBucketTimerMetrics() {
-
-    }
-
-    @Override
-    @Disabled("Not implemented yet")
-    public void deleteBucketShouldBeIdempotent() {
-
-    }
-
-    @Override
-    @Disabled("Not implemented yet")
-    public void deleteBucketConcurrentlyShouldNotFail() {
-
-    }
-
-    @Override
-    @Disabled("Not implemented yet")
-    public void deleteBucketShouldDeleteExistingBucketWithItsData() {
-
-    }
-
-    @Override
-    @Disabled("Not implemented yet")
-    public void deleteBucketShouldThrowWhenNullBucketName() {
-
-    }
-
-    @Override
     @Disabled("Concurrent read and delete can lead to partial reads (no transactions)")
     public void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() {
 
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
index 8994ed4..66a7abc 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
@@ -34,8 +34,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.blob.api.HashBlobId;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -48,7 +50,7 @@ class CassandraBucketDAOTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        testee = new CassandraBucketDAO(cassandraCluster.getCassandraCluster().getConf());
+        testee = new CassandraBucketDAO(new HashBlobId.Factory(), cassandraCluster.getCassandraCluster().getConf());
     }
 
     @Test
@@ -166,11 +168,28 @@ class CassandraBucketDAOTest {
     }
 
     @Test
-    void selectRowCountShouldNotReturnOtherBucketValue() {
+    void listAllShouldReturnEmptyWhenNone() {
+        assertThat(testee.listAll().toStream()).isEmpty();
+    }
+
+    @Test
+    void listAllShouldReturnPreviouslyInsertedData() {
         testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID, NUMBER_OF_CHUNK).block();
+        testee.saveBlobPartsReferences(BUCKET_NAME_2, BLOB_ID, NUMBER_OF_CHUNK).block();
+        testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID_2, NUMBER_OF_CHUNK).block();
 
-        Optional<Integer> maybeRowCount = testee.selectRowCount(BUCKET_NAME_2, BLOB_ID).blockOptional();
+        assertThat(testee.listAll().toStream()).containsOnly(
+            Pair.of(BUCKET_NAME, BLOB_ID),
+            Pair.of(BUCKET_NAME_2, BLOB_ID),
+            Pair.of(BUCKET_NAME, BLOB_ID_2));
+    }
 
-        assertThat(maybeRowCount).isEmpty();
+    @Test
+    void listAllShouldNotReturnDeletedData() {
+        testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID, NUMBER_OF_CHUNK).block();
+
+        testee.deletePosition(BUCKET_NAME, BLOB_ID).block();
+
+        assertThat(testee.listAll().toStream()).isEmpty();
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org