You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/26 08:41:56 UTC
[james-project] branch master updated: JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 01d200817e JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)
01d200817e is described below
commit 01d200817e28c84a1bbc66d356d5f339e43e2cbe
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Thu May 26 15:41:52 2022 +0700
JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)
-> Confirm inconsistencies before fixing them
Otherwise concurrent POP can be considered as inconsistencies.
-> Log errors
---
.../task/MetaDataFixInconsistenciesService.java | 76 +++++++++++++++++++---
1 file changed, 68 insertions(+), 8 deletions(-)
diff --git a/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java b/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
index a2434524be..d7b815165f 100644
--- a/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
+++ b/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
@@ -54,13 +54,28 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class MetaDataFixInconsistenciesService {
+ private enum Presence {
+ PRESENT,
+ ABSENT
+ }
- @FunctionalInterface
interface Inconsistency {
Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore);
+
+ Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore);
}
- static final Inconsistency NO_INCONSISTENCY = (context, imapUidDAO, pop3MetadataStore) -> Mono.just(Task.Result.COMPLETED);
+ static final Inconsistency NO_INCONSISTENCY = new Inconsistency() {
+ @Override
+ public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+ return Mono.just(Task.Result.COMPLETED);
+ }
+
+ @Override
+ public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+ return Mono.just(this);
+ }
+ };
private static class StalePOP3EntryConsistency implements Inconsistency {
private final MailboxId mailboxId;
@@ -79,16 +94,35 @@ public class MetaDataFixInconsistenciesService {
.doOnSuccess(any -> notifySuccess(context))
.thenReturn(Task.Result.COMPLETED)
.onErrorResume(error -> {
- notifyFailure(context);
+ notifyFailure(context, error);
return Mono.just(Task.Result.PARTIAL);
});
}
- private void notifyFailure(Context context) {
+ @Override
+ public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+ Mono<Presence> imapView = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of((CassandraId) mailboxId))
+ .next()
+ .map(any -> Presence.PRESENT)
+ .switchIfEmpty(Mono.just(Presence.ABSENT));
+ Mono<Presence> pop3View = Mono.from(pop3MetadataStore.retrieve(mailboxId, messageId))
+ .map(any -> Presence.PRESENT)
+ .switchIfEmpty(Mono.just(Presence.ABSENT));
+
+ return imapView.zipWith(pop3View)
+ .map(t2 -> {
+ if (t2.getT1() == Presence.ABSENT && t2.getT2() == Presence.PRESENT) {
+ return this;
+ }
+ return NO_INCONSISTENCY;
+ });
+ }
+
+ private void notifyFailure(Context context, Throwable e) {
context.addErrors(MessageInconsistenciesEntry.builder()
.mailboxId(mailboxId.serialize())
.messageId(messageId.serialize()));
- LOGGER.error("Failed to fix inconsistency for stale POP3 entry: {}", messageId);
+ LOGGER.error("Failed to fix inconsistency for stale POP3 entry: {}", messageId, e);
}
private void notifySuccess(Context context) {
@@ -122,11 +156,30 @@ public class MetaDataFixInconsistenciesService {
.doOnSuccess(any -> notifySuccess(context))
.thenReturn(Task.Result.COMPLETED)
.onErrorResume(error -> {
- notifyFailure(context);
+ notifyFailure(context, error);
return Mono.just(Task.Result.PARTIAL);
});
}
+ @Override
+ public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+ Mono<Presence> imapView = imapUidDAO.retrieve(messageId, Optional.of((CassandraId) mailboxId))
+ .next()
+ .map(any -> Presence.PRESENT)
+ .switchIfEmpty(Mono.just(Presence.ABSENT));
+ Mono<Presence> pop3View = Mono.from(pop3MetadataStore.retrieve(mailboxId, messageId))
+ .map(any -> Presence.PRESENT)
+ .switchIfEmpty(Mono.just(Presence.ABSENT));
+
+ return imapView.zipWith(pop3View)
+ .map(t2 -> {
+ if (t2.getT1() == Presence.PRESENT && t2.getT2() == Presence.ABSENT) {
+ return this;
+ }
+ return NO_INCONSISTENCY;
+ });
+ }
+
private Mono<Pop3MetadataStore.StatMetadata> buildStatMetadata() {
return cassandraMessageDAOV3.retrieveMessage(messageId, FetchType.METADATA)
.switchIfEmpty(Mono.error(new MailboxException("Message not found: " + messageId)))
@@ -134,11 +187,11 @@ public class MetaDataFixInconsistenciesService {
}
- private void notifyFailure(Context context) {
+ private void notifyFailure(Context context, Throwable e) {
context.addErrors(MessageInconsistenciesEntry.builder()
.mailboxId(mailboxId.serialize())
.messageId(messageId.serialize()));
- LOGGER.error("Failed to fix inconsistency for missing POP3 entry: {}", messageId);
+ LOGGER.error("Failed to fix inconsistency for missing POP3 entry: {}", messageId, e);
}
private void notifySuccess(Context context) {
@@ -169,6 +222,11 @@ public class MetaDataFixInconsistenciesService {
LOGGER.error("Failed to detect inconsistency: {}", messageId);
return Mono.just(Task.Result.PARTIAL);
}
+
+ @Override
+ public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+ return Mono.just(this);
+ }
}
public static class Context {
@@ -422,6 +480,7 @@ public class MetaDataFixInconsistenciesService {
.per(PERIOD)
.forOperation(fullMetadata -> detectStaleEntriesInPop3MetaDataStore(fullMetadata)
.doOnNext(any -> context.incrementProcessedPop3MetaDataStoreEntries())
+ .flatMap(inconsistency -> inconsistency.confirm(imapUidDAO, pop3MetadataStore))
.flatMap(inconsistency -> inconsistency.fix(context, imapUidDAO, pop3MetadataStore))));
}
@@ -443,6 +502,7 @@ public class MetaDataFixInconsistenciesService {
.per(PERIOD)
.forOperation(metaData -> detectMissingEntriesInPop3MetaDataStore(metaData)
.doOnNext(any -> context.incrementProcessedImapUidEntries())
+ .flatMap(inconsistency -> inconsistency.confirm(imapUidDAO, pop3MetadataStore))
.flatMap(inconsistency -> inconsistency.fix(context, imapUidDAO, pop3MetadataStore))));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org