You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/29 03:09:02 UTC

[james-project] 02/13: JAMES-3495 MessageIdTable: ignore partially deleted rows

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit eee94f10671580ceda427c2a48ff5d90fa271b8c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jan 27 20:30:05 2021 +0700

    JAMES-3495 MessageIdTable: ignore partially deleted rows
---
 .../mailbox/cassandra/mail/CassandraMessageIdDAO.java | 19 +++++++++++++------
 .../cassandra/mail/CassandraMessageIdDAOTest.java     |  2 --
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 6d7f6bc..567ee53 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.cassandra.table.Flag.SEEN;
 import static org.apache.james.mailbox.cassandra.table.Flag.USER;
 import static org.apache.james.mailbox.cassandra.table.Flag.USER_FLAGS;
 import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SEQ;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.util.Optional;
 
@@ -256,7 +257,6 @@ public class CassandraMessageIdDAO {
     private Mono<Optional<ComposedMessageIdWithMetaData>> asOptionalOfCassandraMessageId(Mono<Row> row) {
         return row
                 .map(this::fromRowToComposedMessageIdWithFlags)
-                .map(Optional::of)
                 .defaultIfEmpty(Optional.empty());
     }
 
@@ -268,7 +268,8 @@ public class CassandraMessageIdDAO {
 
     public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId, MessageRange set, Limit limit) {
         return retrieveRows(mailboxId, set, limit)
-            .map(this::fromRowToComposedMessageIdWithFlags);
+            .map(this::fromRowToComposedMessageIdWithFlags)
+            .handle(publishIfPresent());
     }
 
     public Flux<MessageUid> listUids(CassandraId mailboxId) {
@@ -279,7 +280,8 @@ public class CassandraMessageIdDAO {
 
     public Flux<ComposedMessageIdWithMetaData> retrieveAllMessages() {
         return cassandraAsyncExecutor.executeRows(listStatement.bind())
-            .map(this::fromRowToComposedMessageIdWithFlags);
+            .map(this::fromRowToComposedMessageIdWithFlags)
+            .handle(publishIfPresent());
     }
 
     private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set, Limit limit) {
@@ -329,14 +331,19 @@ public class CassandraMessageIdDAO {
                 .setLong(IMAP_UID_LTE, to.asLong())));
     }
 
-    private ComposedMessageIdWithMetaData fromRowToComposedMessageIdWithFlags(Row row) {
-        return ComposedMessageIdWithMetaData.builder()
+    private Optional<ComposedMessageIdWithMetaData> fromRowToComposedMessageIdWithFlags(Row row) {
+        if (row.getUUID(MESSAGE_ID) == null) {
+            // Out of order updates with concurrent deletes can result in the row being partially deleted
+            // We filter out such records
+            return Optional.empty();
+        }
+        return Optional.of(ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(
                         CassandraId.of(row.getUUID(MAILBOX_ID)),
                         messageIdFactory.of(row.getUUID(MESSAGE_ID)),
                         MessageUid.of(row.getLong(IMAP_UID))))
                 .flags(FlagsExtractor.getFlags(row))
                 .modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
-                .build();
+                .build());
     }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index d72d2c2..2ae675e 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -39,7 +39,6 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -86,7 +85,6 @@ class CassandraMessageIdDAOTest {
         assertThat(message.isPresent()).isFalse();
     }
 
-    @Disabled("A record with a 'null' messageId is returned")
     @Test
     void outOfOrderUpdatesShouldBeIgnored() {
         CassandraId mailboxId = CassandraId.timeBased();


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org