You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/01 07:31:03 UTC
[james-project] 10/20: JAMES-2838 CassandraBlobStore deleteBucket
implementation
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 920e107c5b2639d19efac3503b8deba8ffc05b9f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 25 11:50:57 2019 +0200
JAMES-2838 CassandraBlobStore deleteBucket implementation
Due to the primary key schema we can not get in a single request all of the blobs part of a bucket.
Thus, we need to scroll all blobs in order to filter out those not part of the deleted bucket.
Note that large rows issues disallow the trivial (PK: bucketName, CK: blobId) design.
A complex timeserie could be imagined in order to get all blobs of a bucket, but iteration over the bucket will still be needed (part of the primary key). Thus, deleting a bucket in the CassandraBlobStore is a complex operation.
---
.../james/blob/cassandra/CassandraBlobStore.java | 11 +++++--
.../james/blob/cassandra/CassandraBucketDAO.java | 20 +++++++++++--
.../blob/cassandra/CassandraBlobStoreTest.java | 35 ++--------------------
.../blob/cassandra/CassandraBucketDAOTest.java | 27 ++++++++++++++---
4 files changed, 52 insertions(+), 41 deletions(-)
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index e9fa212..cfdc440 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -29,7 +29,6 @@ import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobId;
@@ -71,7 +70,7 @@ public class CassandraBlobStore implements BlobStore {
@VisibleForTesting
public CassandraBlobStore(Session session) {
this(new CassandraDefaultBucketDAO(session),
- new CassandraBucketDAO(session),
+ new CassandraBucketDAO(new HashBlobId.Factory(), session),
CassandraConfiguration.DEFAULT_CONFIGURATION,
new HashBlobId.Factory());
}
@@ -154,7 +153,13 @@ public class CassandraBlobStore implements BlobStore {
@Override
public Mono<Void> deleteBucket(BucketName bucketName) {
- throw new NotImplementedException("not implemented");
+ Preconditions.checkNotNull(bucketName);
+
+ return bucketDAO.listAll()
+ .filter(bucketNameBlobIdPair -> bucketNameBlobIdPair.getKey().equals(bucketName))
+ .map(Pair::getValue)
+ .flatMap(blobId -> delete(bucketName, blobId))
+ .then();
}
@Override
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 713429f..0c6771a 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
import javax.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
@@ -42,9 +43,11 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public class CassandraBucketDAO {
+class CassandraBucketDAO {
+ private final BlobId.Factory blobIdFactory;
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement insert;
private final PreparedStatement insertPart;
@@ -52,10 +55,12 @@ public class CassandraBucketDAO {
private final PreparedStatement selectPart;
private final PreparedStatement delete;
private final PreparedStatement deleteParts;
+ private final PreparedStatement listAll;
@Inject
@VisibleForTesting
- public CassandraBucketDAO(Session session) {
+ CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
+ this.blobIdFactory = blobIdFactory;
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.insert = prepareInsert(session);
this.select = prepareSelect(session);
@@ -63,6 +68,7 @@ public class CassandraBucketDAO {
this.insertPart = prepareInsertPart(session);
this.selectPart = prepareSelectPart(session);
this.deleteParts = prepareDeleteParts(session);
+ this.listAll = prepareListAll(session);
}
private PreparedStatement prepareDeleteParts(Session session) {
@@ -79,6 +85,11 @@ public class CassandraBucketDAO {
.and(eq(ID, bindMarker(ID))));
}
+ private PreparedStatement prepareListAll(Session session) {
+ return session.prepare(select()
+ .from(BlobTables.BucketBlobTable.TABLE_NAME));
+ }
+
private PreparedStatement prepareSelect(Session session) {
return session.prepare(select()
.from(BlobTables.BucketBlobTable.TABLE_NAME)
@@ -157,6 +168,11 @@ public class CassandraBucketDAO {
.setString(BucketBlobParts.ID, blobId.asString()));
}
+ Flux<Pair<BucketName, BlobId>> listAll() {
+ return cassandraAsyncExecutor.executeRows(listAll.bind())
+ .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
+ }
+
private byte[] rowToData(Row row) {
byte[] data = new byte[row.getBytes(BucketBlobParts.DATA).remaining()];
row.getBytes(BucketBlobParts.DATA).get(data);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 3014a60..c1c6d8b 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -53,14 +53,15 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
@BeforeEach
void setUp(CassandraCluster cassandra) {
+ HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
testee = new MetricableBlobStore(
metricsTestExtension.getMetricFactory(),
new CassandraBlobStore(new CassandraDefaultBucketDAO(cassandra.getConf()),
- new CassandraBucketDAO(cassandra.getConf()),
+ new CassandraBucketDAO(blobIdFactory, cassandra.getConf()),
CassandraConfiguration.builder()
.blobPartSize(CHUNK_SIZE)
.build(),
- new HashBlobId.Factory()));
+ blobIdFactory));
}
@Override
@@ -74,36 +75,6 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract, Buck
}
@Override
- @Disabled("JAMES-2806: delete bucket not implemented yet for Cassandra")
- public void deleteBucketShouldPublishDeleteBucketTimerMetrics() {
-
- }
-
- @Override
- @Disabled("Not implemented yet")
- public void deleteBucketShouldBeIdempotent() {
-
- }
-
- @Override
- @Disabled("Not implemented yet")
- public void deleteBucketConcurrentlyShouldNotFail() {
-
- }
-
- @Override
- @Disabled("Not implemented yet")
- public void deleteBucketShouldDeleteExistingBucketWithItsData() {
-
- }
-
- @Override
- @Disabled("Not implemented yet")
- public void deleteBucketShouldThrowWhenNullBucketName() {
-
- }
-
- @Override
@Disabled("Concurrent read and delete can lead to partial reads (no transactions)")
public void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() {
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
index 8994ed4..66a7abc 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
@@ -34,8 +34,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
import java.nio.ByteBuffer;
import java.util.Optional;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.blob.api.HashBlobId;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -48,7 +50,7 @@ class CassandraBucketDAOTest {
@BeforeEach
void setUp(CassandraCluster cassandra) {
- testee = new CassandraBucketDAO(cassandraCluster.getCassandraCluster().getConf());
+ testee = new CassandraBucketDAO(new HashBlobId.Factory(), cassandraCluster.getCassandraCluster().getConf());
}
@Test
@@ -166,11 +168,28 @@ class CassandraBucketDAOTest {
}
@Test
- void selectRowCountShouldNotReturnOtherBucketValue() {
+ void listAllShouldReturnEmptyWhenNone() {
+ assertThat(testee.listAll().toStream()).isEmpty();
+ }
+
+ @Test
+ void listAllShouldReturnPreviouslyInsertedData() {
testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID, NUMBER_OF_CHUNK).block();
+ testee.saveBlobPartsReferences(BUCKET_NAME_2, BLOB_ID, NUMBER_OF_CHUNK).block();
+ testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID_2, NUMBER_OF_CHUNK).block();
- Optional<Integer> maybeRowCount = testee.selectRowCount(BUCKET_NAME_2, BLOB_ID).blockOptional();
+ assertThat(testee.listAll().toStream()).containsOnly(
+ Pair.of(BUCKET_NAME, BLOB_ID),
+ Pair.of(BUCKET_NAME_2, BLOB_ID),
+ Pair.of(BUCKET_NAME, BLOB_ID_2));
+ }
- assertThat(maybeRowCount).isEmpty();
+ @Test
+ void listAllShouldNotReturnDeletedData() {
+ testee.saveBlobPartsReferences(BUCKET_NAME, BLOB_ID, NUMBER_OF_CHUNK).block();
+
+ testee.deletePosition(BUCKET_NAME, BLOB_ID).block();
+
+ assertThat(testee.listAll().toStream()).isEmpty();
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org