You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/01/29 03:09:02 UTC
[james-project] 02/13: JAMES-3495 MessageIdTable: ignore partially
deleted rows
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit eee94f10671580ceda427c2a48ff5d90fa271b8c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jan 27 20:30:05 2021 +0700
JAMES-3495 MessageIdTable: ignore partially deleted rows
---
.../mailbox/cassandra/mail/CassandraMessageIdDAO.java | 19 +++++++++++++------
.../cassandra/mail/CassandraMessageIdDAOTest.java | 2 --
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 6d7f6bc..567ee53 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.cassandra.table.Flag.SEEN;
import static org.apache.james.mailbox.cassandra.table.Flag.USER;
import static org.apache.james.mailbox.cassandra.table.Flag.USER_FLAGS;
import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SEQ;
+import static org.apache.james.util.ReactorUtils.publishIfPresent;
import java.util.Optional;
@@ -256,7 +257,6 @@ public class CassandraMessageIdDAO {
private Mono<Optional<ComposedMessageIdWithMetaData>> asOptionalOfCassandraMessageId(Mono<Row> row) {
return row
.map(this::fromRowToComposedMessageIdWithFlags)
- .map(Optional::of)
.defaultIfEmpty(Optional.empty());
}
@@ -268,7 +268,8 @@ public class CassandraMessageIdDAO {
public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId, MessageRange set, Limit limit) {
return retrieveRows(mailboxId, set, limit)
- .map(this::fromRowToComposedMessageIdWithFlags);
+ .map(this::fromRowToComposedMessageIdWithFlags)
+ .handle(publishIfPresent());
}
public Flux<MessageUid> listUids(CassandraId mailboxId) {
@@ -279,7 +280,8 @@ public class CassandraMessageIdDAO {
public Flux<ComposedMessageIdWithMetaData> retrieveAllMessages() {
return cassandraAsyncExecutor.executeRows(listStatement.bind())
- .map(this::fromRowToComposedMessageIdWithFlags);
+ .map(this::fromRowToComposedMessageIdWithFlags)
+ .handle(publishIfPresent());
}
private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set, Limit limit) {
@@ -329,14 +331,19 @@ public class CassandraMessageIdDAO {
.setLong(IMAP_UID_LTE, to.asLong())));
}
- private ComposedMessageIdWithMetaData fromRowToComposedMessageIdWithFlags(Row row) {
- return ComposedMessageIdWithMetaData.builder()
+ private Optional<ComposedMessageIdWithMetaData> fromRowToComposedMessageIdWithFlags(Row row) {
+ if (row.getUUID(MESSAGE_ID) == null) {
+ // Out of order updates with concurrent deletes can result in the row being partially deleted
+ // We filter out such records
+ return Optional.empty();
+ }
+ return Optional.of(ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(
CassandraId.of(row.getUUID(MAILBOX_ID)),
messageIdFactory.of(row.getUUID(MESSAGE_ID)),
MessageUid.of(row.getLong(IMAP_UID))))
.flags(FlagsExtractor.getFlags(row))
.modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
- .build();
+ .build());
}
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index d72d2c2..2ae675e 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -39,7 +39,6 @@ import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.util.streams.Limit;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -86,7 +85,6 @@ class CassandraMessageIdDAOTest {
assertThat(message.isPresent()).isFalse();
}
- @Disabled("A record with a 'null' messageId is returned")
@Test
void outOfOrderUpdatesShouldBeIgnored() {
CassandraId mailboxId = CassandraId.timeBased();
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org