You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/06 09:16:05 UTC

[james-project] 01/04: JAMES-3516 Implement Thread lookup table by messageId

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9b72bc6c4a15188094f39c6a5b37a0beb2f9d8d3
Author: quanth <hq...@linagora.com>
AuthorDate: Thu Jul 29 15:14:28 2021 +0700

    JAMES-3516 Implement Thread lookup table by messageId
---
 .../cassandra/mail/CassandraThreadLookupDAO.java   | 101 +++++++++++++++++++
 .../cassandra/modules/CassandraThreadModule.java   |   9 ++
 .../table/CassandraThreadLookupTable.java          |  22 ++---
 .../mail/CassandraThreadLookupDAOTest.java         | 110 +++++++++++++++++++++
 .../mailbox/CassandraThreadIdGuessingModule.java   |   2 +
 5 files changed, 229 insertions(+), 15 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
new file mode 100644
index 0000000..cde86ce
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -0,0 +1,101 @@
+/******************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one     *
+ * or more contributor license agreements.  See the NOTICE file   *
+ * distributed with this work for additional information          *
+ * regarding copyright ownership.  The ASF licenses this file     *
+ * to you under the Apache License, Version 2.0 (the              *
+ * "License"); you may not use this file except in compliance     *
+ * with the License.  You may obtain a copy of the License at     *
+ *                                                                *
+ * http://www.apache.org/licenses/LICENSE-2.0                     *
+ *                                                                *
+ * Unless required by applicable law or agreed to in writing,     *
+ * software distributed under the License is distributed on an    *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY         *
+ * KIND, either express or implied.  See the License for the      *
+ * specific language governing permissions and limitations        *
+ * under the License.                                             *
+ ******************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.TABLE_NAME;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.model.MimeMessageId;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Mono;
+
+public class CassandraThreadLookupDAO {
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement insert;
+    private final PreparedStatement select;
+    private final PreparedStatement delete;
+
+    @Inject
+    public CassandraThreadLookupDAO(Session session) {
+        executor = new CassandraAsyncExecutor(session);
+
+        insert = session.prepare(insertInto(TABLE_NAME)
+            .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+            .value(USERNAME, bindMarker(USERNAME))
+            .value(MIME_MESSAGE_IDS, bindMarker(MIME_MESSAGE_IDS)));
+
+        select = session.prepare(select(USERNAME, MIME_MESSAGE_IDS)
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+
+        delete = session.prepare(delete()
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+    }
+
+    public Mono<Void> insert(MessageId messageId, Username username, Set<MimeMessageId> mimeMessageIds) {
+        Set<String> mimeMessageIdsString = mimeMessageIds.stream().map(MimeMessageId::getValue).collect(ImmutableSet.toImmutableSet());
+        return executor.executeVoid(insert.bind()
+            .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
+            .setString(USERNAME, username.asString())
+            .setSet(MIME_MESSAGE_IDS, mimeMessageIdsString));
+    }
+
+    public Mono<Pair<Username, Set<MimeMessageId>>> selectOneRow(MessageId messageId) {
+        return executor.executeSingleRow(
+            select.bind().setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()))
+            .map(this::readRow);
+    }
+
+    public Mono<Void> deleteOneRow(MessageId messageId) {
+        return executor.executeVoid(delete.bind()
+            .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()));
+    }
+
+    private Pair<Username, Set<MimeMessageId>> readRow(Row row) {
+        Set<MimeMessageId> mimeMessageIds = row.getSet(MIME_MESSAGE_IDS, String.class)
+            .stream()
+            .map(MimeMessageId::new)
+            .collect(ImmutableSet.toImmutableSet());
+        return Pair.of(Username.of(row.getString(USERNAME)),
+            mimeMessageIds);
+    }
+}
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
index f75bec3..8873f3a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
@@ -19,16 +19,19 @@
 
 package org.apache.james.mailbox.cassandra.modules;
 
+import static com.datastax.driver.core.DataType.frozenSet;
 import static com.datastax.driver.core.DataType.text;
 import static com.datastax.driver.core.DataType.timeuuid;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageIdTable.THREAD_ID;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS;
 import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.BASE_SUBJECT;
 import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.MIME_MESSAGE_ID;
 import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable;
 
 public interface CassandraThreadModule {
     CassandraModule MODULE = CassandraModule.builder()
@@ -40,6 +43,12 @@ public interface CassandraThreadModule {
             .addClusteringColumn(MESSAGE_ID, timeuuid())
             .addColumn(THREAD_ID, timeuuid())
             .addColumn(BASE_SUBJECT, text()))
+        .table(CassandraThreadLookupTable.TABLE_NAME)
+        .comment("Thread table lookup by messageId, using for deletion thread data")
+        .statement(statement -> statement
+            .addPartitionKey(MESSAGE_ID, timeuuid())
+            .addColumn(USERNAME, text())
+            .addColumn(MIME_MESSAGE_IDS, frozenSet(text())))
         .build();
 
 }
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
similarity index 63%
copy from server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
copy to mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
index 6461f06..7a8c1b3 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
@@ -17,23 +17,15 @@
  * under the License.                                             *
  ******************************************************************/
 
-package org.apache.james.modules.mailbox;
+package org.apache.james.mailbox.cassandra.table;
 
-import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
-import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
 
-import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
-import com.google.inject.multibindings.Multibinder;
+public interface CassandraThreadLookupTable {
+    String TABLE_NAME = "threadLookupTable";
 
-public class CassandraThreadIdGuessingModule extends AbstractModule {
-    @Override
-    protected void configure() {
-        bind(CassandraThreadDAO.class).in(Scopes.SINGLETON);
+    String MIME_MESSAGE_IDS = "mimeMessageIds";
 
-        Multibinder.newSetBinder(binder(), CassandraModule.class)
-            .addBinding()
-            .toInstance(CassandraThreadModule.MODULE);
-    }
+    String[] FIELDS = {MESSAGE_ID, USERNAME, MIME_MESSAGE_IDS};
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
new file mode 100644
index 0000000..316e2fb
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
@@ -0,0 +1,110 @@
+/******************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one     *
+ * or more contributor license agreements.  See the NOTICE file   *
+ * distributed with this work for additional information          *
+ * regarding copyright ownership.  The ASF licenses this file     *
+ * to you under the Apache License, Version 2.0 (the              *
+ * "License"); you may not use this file except in compliance     *
+ * with the License.  You may obtain a copy of the License at     *
+ *                                                                *
+ * http://www.apache.org/licenses/LICENSE-2.0                     *
+ *                                                                *
+ * Unless required by applicable law or agreed to in writing,     *
+ * software distributed under the License is distributed on an    *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY         *
+ * KIND, either express or implied.  See the License for the      *
+ * specific language governing permissions and limitations        *
+ * under the License.                                             *
+ ******************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
+import org.apache.james.mailbox.store.mail.model.MimeMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraThreadLookupDAOTest {
+    private static final Username ALICE = Username.of("alice");
+    private static final Username BOB = Username.of("bob");
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraThreadModule.MODULE);
+
+    private CassandraThreadLookupDAO testee;
+    private CassandraMessageId messageId1;
+    private CassandraMessageId messageId2;
+    private MimeMessageId mimeMessageId1;
+    private MimeMessageId mimeMessageId2;
+    private MimeMessageId mimeMessageId3;
+    private MimeMessageId mimeMessageId4;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new CassandraThreadLookupDAO(cassandra.getConf());
+
+        CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
+        messageId1 = messageIdFactory.generate();
+        messageId2 = messageIdFactory.generate();
+
+        mimeMessageId1 = new MimeMessageId("MimeMessageID1");
+        mimeMessageId2 = new MimeMessageId("MimeMessageID2");
+        mimeMessageId3 = new MimeMessageId("MimeMessageID3");
+        mimeMessageId4 = new MimeMessageId("MimeMessageID4");
+    }
+
+    @Test
+    void insertShouldSuccess() {
+        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+        assertThat(testee.selectOneRow(messageId1).block())
+            .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+    }
+
+    @Test
+    void selectShouldReturnNullWhenMessageIdNonExist() {
+        assertThat(testee.selectOneRow(messageId1).block())
+            .isNull();
+    }
+
+    @Test
+    void selectShouldReturnOnlyRelatedDataByThatMessageId() {
+        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+        testee.insert(messageId2, BOB, Set.of(mimeMessageId3, mimeMessageId4)).block();
+
+        assertThat(testee.selectOneRow(messageId1).block())
+            .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+    }
+
+    @Test
+    void deletedEntriesShouldNotBeReturned() {
+        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+        testee.deleteOneRow(messageId1).block();
+
+        assertThat(testee.selectOneRow(messageId1).block())
+            .isNull();
+    }
+
+    @Test
+    void deleteByNonExistMessageIdShouldDeleteNothing() {
+        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+        testee.deleteOneRow(messageId2).block();
+
+        // message1's data should remain
+        assertThat(testee.selectOneRow(messageId1).block())
+            .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+    }
+
+}
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
index 6461f06..d2699b4 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
+++ b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
@@ -21,6 +21,7 @@ package org.apache.james.modules.mailbox;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO;
 import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
 
 import com.google.inject.AbstractModule;
@@ -31,6 +32,7 @@ public class CassandraThreadIdGuessingModule extends AbstractModule {
     @Override
     protected void configure() {
         bind(CassandraThreadDAO.class).in(Scopes.SINGLETON);
+        bind(CassandraThreadLookupDAO.class).in(Scopes.SINGLETON);
 
         Multibinder.newSetBinder(binder(), CassandraModule.class)
             .addBinding()

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org