You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/07/12 07:11:29 UTC

[james-project] 03/09: JAMES-2810 Implement UserPerBucket DAO

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 36a1d98079b0988cc045764691b99596274f0385
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 5 11:08:20 2019 +0200

    JAMES-2810 Implement UserPerBucket DAO
    
    This will be needed to iterate in a bucket and will be used as a building block of the CassandraDeletedMessageMetadataVault cleanup.
    
    Note that a bucket is further partitionned by user to avoid wide rows thus iterating through users is required upon bucket deletion.
---
 .../metadata/DeletedMessageMetadataModule.java     |  15 ++++
 .../james/vault/metadata/UserPerBucketDAO.java     |  87 ++++++++++++++++++
 .../james/vault/metadata/UserPerBucketDAOTest.java | 100 +++++++++++++++++++++
 3 files changed, 202 insertions(+)

diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java
index 6fff8a9..eceb87c 100644
--- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java
@@ -36,6 +36,13 @@ public interface DeletedMessageMetadataModule {
         String BLOB_ID = "blobId";
     }
 
+    interface UserPerBucketTable {
+        String TABLE = "userPerBucket";
+
+        String BUCKET_NAME = "bucketName";
+        String USER = "user";
+    }
+
     CassandraModule MODULE = CassandraModule
         .builder()
 
@@ -49,5 +56,13 @@ public interface DeletedMessageMetadataModule {
             .addColumn(StorageInformationTable.BUCKET_NAME, text())
             .addColumn(StorageInformationTable.BLOB_ID, text()))
 
+        .table(UserPerBucketTable.TABLE)
+        .comment("Holds user list having deletedMessages stored in a given bucket in the BlobStore based DeletedMessages vault")
+        .options(options -> options
+            .caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.noRows()))
+        .statement(statement -> statement
+            .addPartitionKey(UserPerBucketTable.BUCKET_NAME, text())
+            .addClusteringColumn(UserPerBucketTable.USER, text()))
+
         .build();
 }
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java
new file mode 100644
index 0000000..743b91d
--- /dev/null
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.vault.metadata;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.BUCKET_NAME;
+import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.TABLE;
+import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.USER;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.User;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class UserPerBucketDAO {
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
+    private final PreparedStatement addStatement;
+    private final PreparedStatement removeStatement;
+    private final PreparedStatement listStatement;
+
+    UserPerBucketDAO(Session session) {
+        cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        addStatement = prepareAddUser(session);
+        removeStatement = prepareRemoveBucket(session);
+        listStatement = prepareListUser(session);
+    }
+
+    private PreparedStatement prepareAddUser(Session session) {
+        return session.prepare(insertInto(TABLE)
+            .value(BUCKET_NAME, bindMarker(BUCKET_NAME))
+            .value(USER, bindMarker(USER)));
+    }
+
+    private PreparedStatement prepareRemoveBucket(Session session) {
+        return session.prepare(delete().from(TABLE)
+            .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME))));
+    }
+
+    private PreparedStatement prepareListUser(Session session) {
+        return session.prepare(select(USER).from(TABLE)
+            .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME))));
+    }
+
+    Flux<User> retrieveUsers(BucketName bucketName) {
+        return cassandraAsyncExecutor.executeRows(listStatement.bind()
+            .setString(BUCKET_NAME, bucketName.asString()))
+            .map(row -> row.getString(USER))
+            .map(User::fromUsername);
+    }
+
+    Mono<Void> addUser(BucketName bucketName, User user) {
+        return cassandraAsyncExecutor.executeVoid(addStatement.bind()
+            .setString(BUCKET_NAME, bucketName.asString())
+            .setString(USER, user.asString()));
+    }
+
+    Mono<Void> deleteBucket(BucketName bucketName) {
+        return cassandraAsyncExecutor.executeVoid(removeStatement.bind()
+            .setString(BUCKET_NAME, bucketName.asString()));
+    }
+}
diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java
new file mode 100644
index 0000000..94f655e
--- /dev/null
+++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.vault.metadata;
+
+import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.MODULE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.User;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class UserPerBucketDAOTest {
+    private static final BucketName BUCKET_NAME = BucketName.of("deletedMessages-2019-06-01");
+    private static final BucketName BUCKET_NAME_2 = BucketName.of("deletedMessages-2019-07-01");
+    private static final User OWNER = User.fromUsername("owner");
+    private static final User OWNER_2 = User.fromUsername("owner2");
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
+
+    private UserPerBucketDAO testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new UserPerBucketDAO(cassandra.getConf());
+    }
+
+    @Test
+    void retrieveUsersShouldReturnEmptyWhenNone() {
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).isEmpty();
+    }
+
+    @Test
+    void retrieveUsersShouldReturnAddedUser() {
+        testee.addUser(BUCKET_NAME, OWNER).block();
+
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactly(OWNER);
+    }
+
+    @Test
+    void retrieveUsersShouldReturnAddedUsers() {
+        testee.addUser(BUCKET_NAME, OWNER).block();
+        testee.addUser(BUCKET_NAME, OWNER_2).block();
+
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER, OWNER_2);
+    }
+
+    @Test
+    void retrieveUsersShouldNotReturnUsersOfOtherBuckets() {
+        testee.addUser(BUCKET_NAME, OWNER).block();
+        testee.addUser(BUCKET_NAME_2, OWNER_2).block();
+
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER);
+    }
+
+    @Test
+    void addUserShouldBeIdempotent() {
+        testee.addUser(BUCKET_NAME, OWNER).block();
+        testee.addUser(BUCKET_NAME, OWNER).block();
+
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER);
+    }
+
+    @Test
+    void retrieveUsersShouldReturnEmptyWhenDeletedBucket() {
+        testee.addUser(BUCKET_NAME, OWNER).block();
+
+        testee.deleteBucket(BUCKET_NAME).block();
+
+        assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).isEmpty();
+    }
+
+    @Test
+    void deleteBucketShouldNotThrowWhenNone() {
+        assertThatCode(() -> testee.deleteBucket(BUCKET_NAME).block())
+            .doesNotThrowAnyException();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org