You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/06 09:16:05 UTC
[james-project] 01/04: JAMES-3516 Implement Thread lookup table by
messageId
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9b72bc6c4a15188094f39c6a5b37a0beb2f9d8d3
Author: quanth <hq...@linagora.com>
AuthorDate: Thu Jul 29 15:14:28 2021 +0700
JAMES-3516 Implement Thread lookup table by messageId
---
.../cassandra/mail/CassandraThreadLookupDAO.java | 101 +++++++++++++++++++
.../cassandra/modules/CassandraThreadModule.java | 9 ++
.../table/CassandraThreadLookupTable.java | 22 ++---
.../mail/CassandraThreadLookupDAOTest.java | 110 +++++++++++++++++++++
.../mailbox/CassandraThreadIdGuessingModule.java | 2 +
5 files changed, 229 insertions(+), 15 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
new file mode 100644
index 0000000..cde86ce
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -0,0 +1,101 @@
+/******************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ******************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.TABLE_NAME;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
+
+import java.util.Set;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.model.MimeMessageId;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Mono;
+
+public class CassandraThreadLookupDAO {
+ private final CassandraAsyncExecutor executor;
+ private final PreparedStatement insert;
+ private final PreparedStatement select;
+ private final PreparedStatement delete;
+
+ @Inject
+ public CassandraThreadLookupDAO(Session session) {
+ executor = new CassandraAsyncExecutor(session);
+
+ insert = session.prepare(insertInto(TABLE_NAME)
+ .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+ .value(USERNAME, bindMarker(USERNAME))
+ .value(MIME_MESSAGE_IDS, bindMarker(MIME_MESSAGE_IDS)));
+
+ select = session.prepare(select(USERNAME, MIME_MESSAGE_IDS)
+ .from(TABLE_NAME)
+ .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+
+ delete = session.prepare(delete()
+ .from(TABLE_NAME)
+ .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+ }
+
+ public Mono<Void> insert(MessageId messageId, Username username, Set<MimeMessageId> mimeMessageIds) {
+ Set<String> mimeMessageIdsString = mimeMessageIds.stream().map(MimeMessageId::getValue).collect(ImmutableSet.toImmutableSet());
+ return executor.executeVoid(insert.bind()
+ .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
+ .setString(USERNAME, username.asString())
+ .setSet(MIME_MESSAGE_IDS, mimeMessageIdsString));
+ }
+
+ public Mono<Pair<Username, Set<MimeMessageId>>> selectOneRow(MessageId messageId) {
+ return executor.executeSingleRow(
+ select.bind().setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()))
+ .map(this::readRow);
+ }
+
+ public Mono<Void> deleteOneRow(MessageId messageId) {
+ return executor.executeVoid(delete.bind()
+ .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()));
+ }
+
+ private Pair<Username, Set<MimeMessageId>> readRow(Row row) {
+ Set<MimeMessageId> mimeMessageIds = row.getSet(MIME_MESSAGE_IDS, String.class)
+ .stream()
+ .map(MimeMessageId::new)
+ .collect(ImmutableSet.toImmutableSet());
+ return Pair.of(Username.of(row.getString(USERNAME)),
+ mimeMessageIds);
+ }
+}
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
index f75bec3..8873f3a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
@@ -19,16 +19,19 @@
package org.apache.james.mailbox.cassandra.modules;
+import static com.datastax.driver.core.DataType.frozenSet;
import static com.datastax.driver.core.DataType.text;
import static com.datastax.driver.core.DataType.timeuuid;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageIdTable.THREAD_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS;
import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.BASE_SUBJECT;
import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.MIME_MESSAGE_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable;
public interface CassandraThreadModule {
CassandraModule MODULE = CassandraModule.builder()
@@ -40,6 +43,12 @@ public interface CassandraThreadModule {
.addClusteringColumn(MESSAGE_ID, timeuuid())
.addColumn(THREAD_ID, timeuuid())
.addColumn(BASE_SUBJECT, text()))
+ .table(CassandraThreadLookupTable.TABLE_NAME)
+ .comment("Thread table lookup by messageId, using for deletion thread data")
+ .statement(statement -> statement
+ .addPartitionKey(MESSAGE_ID, timeuuid())
+ .addColumn(USERNAME, text())
+ .addColumn(MIME_MESSAGE_IDS, frozenSet(text())))
.build();
}
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
similarity index 63%
copy from server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
copy to mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
index 6461f06..7a8c1b3 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
@@ -17,23 +17,15 @@
* under the License. *
******************************************************************/
-package org.apache.james.modules.mailbox;
+package org.apache.james.mailbox.cassandra.table;
-import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
-import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
-import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
-import com.google.inject.multibindings.Multibinder;
+public interface CassandraThreadLookupTable {
+ String TABLE_NAME = "threadLookupTable";
-public class CassandraThreadIdGuessingModule extends AbstractModule {
- @Override
- protected void configure() {
- bind(CassandraThreadDAO.class).in(Scopes.SINGLETON);
+ String MIME_MESSAGE_IDS = "mimeMessageIds";
- Multibinder.newSetBinder(binder(), CassandraModule.class)
- .addBinding()
- .toInstance(CassandraThreadModule.MODULE);
- }
+ String[] FIELDS = {MESSAGE_ID, USERNAME, MIME_MESSAGE_IDS};
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
new file mode 100644
index 0000000..316e2fb
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
@@ -0,0 +1,110 @@
+/******************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ******************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
+import org.apache.james.mailbox.store.mail.model.MimeMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class CassandraThreadLookupDAOTest {
+ private static final Username ALICE = Username.of("alice");
+ private static final Username BOB = Username.of("bob");
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraThreadModule.MODULE);
+
+ private CassandraThreadLookupDAO testee;
+ private CassandraMessageId messageId1;
+ private CassandraMessageId messageId2;
+ private MimeMessageId mimeMessageId1;
+ private MimeMessageId mimeMessageId2;
+ private MimeMessageId mimeMessageId3;
+ private MimeMessageId mimeMessageId4;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ testee = new CassandraThreadLookupDAO(cassandra.getConf());
+
+ CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
+ messageId1 = messageIdFactory.generate();
+ messageId2 = messageIdFactory.generate();
+
+ mimeMessageId1 = new MimeMessageId("MimeMessageID1");
+ mimeMessageId2 = new MimeMessageId("MimeMessageID2");
+ mimeMessageId3 = new MimeMessageId("MimeMessageID3");
+ mimeMessageId4 = new MimeMessageId("MimeMessageID4");
+ }
+
+ @Test
+ void insertShouldSuccess() {
+ testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+ assertThat(testee.selectOneRow(messageId1).block())
+ .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+ }
+
+ @Test
+ void selectShouldReturnNullWhenMessageIdNonExist() {
+ assertThat(testee.selectOneRow(messageId1).block())
+ .isNull();
+ }
+
+ @Test
+ void selectShouldReturnOnlyRelatedDataByThatMessageId() {
+ testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+ testee.insert(messageId2, BOB, Set.of(mimeMessageId3, mimeMessageId4)).block();
+
+ assertThat(testee.selectOneRow(messageId1).block())
+ .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+ }
+
+ @Test
+ void deletedEntriesShouldNotBeReturned() {
+ testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+ testee.deleteOneRow(messageId1).block();
+
+ assertThat(testee.selectOneRow(messageId1).block())
+ .isNull();
+ }
+
+ @Test
+ void deleteByNonExistMessageIdShouldDeleteNothing() {
+ testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block();
+
+ testee.deleteOneRow(messageId2).block();
+
+ // message1's data should remain
+ assertThat(testee.selectOneRow(messageId1).block())
+ .isEqualTo(Pair.of(ALICE, Set.of(mimeMessageId1, mimeMessageId2)));
+ }
+
+}
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
index 6461f06..d2699b4 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
+++ b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/mailbox/CassandraThreadIdGuessingModule.java
@@ -21,6 +21,7 @@ package org.apache.james.modules.mailbox;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO;
import org.apache.james.mailbox.cassandra.modules.CassandraThreadModule;
import com.google.inject.AbstractModule;
@@ -31,6 +32,7 @@ public class CassandraThreadIdGuessingModule extends AbstractModule {
@Override
protected void configure() {
bind(CassandraThreadDAO.class).in(Scopes.SINGLETON);
+ bind(CassandraThreadLookupDAO.class).in(Scopes.SINGLETON);
Multibinder.newSetBinder(binder(), CassandraModule.class)
.addBinding()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org