You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/26 08:41:56 UTC

[james-project] branch master updated: JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d200817e JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)
01d200817e is described below

commit 01d200817e28c84a1bbc66d356d5f339e43e2cbe
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Thu May 26 15:41:52 2022 +0700

    JAMES-3623 MetaDataFixInconsistenciesService: account for concurrency (#1019)
    
     -> Confirm inconsistencies before fixing them
    
     Otherwise concurrent POP can be considered as inconsistencies.
    
     -> Log errors
---
 .../task/MetaDataFixInconsistenciesService.java    | 76 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 8 deletions(-)

diff --git a/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java b/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
index a2434524be..d7b815165f 100644
--- a/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
+++ b/server/protocols/protocols-pop3-distributed/src/main/java/org/apache/james/pop3server/mailbox/task/MetaDataFixInconsistenciesService.java
@@ -54,13 +54,28 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MetaDataFixInconsistenciesService {
+    private enum Presence {
+        PRESENT,
+        ABSENT
+    }
 
-    @FunctionalInterface
     interface Inconsistency {
         Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore);
+
+        Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore);
     }
 
-    static final Inconsistency NO_INCONSISTENCY = (context, imapUidDAO, pop3MetadataStore) -> Mono.just(Task.Result.COMPLETED);
+    static final Inconsistency NO_INCONSISTENCY = new Inconsistency() {
+        @Override
+        public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+            return Mono.just(Task.Result.COMPLETED);
+        }
+
+        @Override
+        public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+            return Mono.just(this);
+        }
+    };
 
     private static class StalePOP3EntryConsistency implements Inconsistency {
         private final MailboxId mailboxId;
@@ -79,16 +94,35 @@ public class MetaDataFixInconsistenciesService {
                 .doOnSuccess(any -> notifySuccess(context))
                 .thenReturn(Task.Result.COMPLETED)
                 .onErrorResume(error -> {
-                    notifyFailure(context);
+                    notifyFailure(context, error);
                     return Mono.just(Task.Result.PARTIAL);
                 });
         }
 
-        private void notifyFailure(Context context) {
+        @Override
+        public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+            Mono<Presence> imapView = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of((CassandraId) mailboxId))
+                .next()
+                .map(any -> Presence.PRESENT)
+                .switchIfEmpty(Mono.just(Presence.ABSENT));
+            Mono<Presence> pop3View = Mono.from(pop3MetadataStore.retrieve(mailboxId, messageId))
+                .map(any -> Presence.PRESENT)
+                .switchIfEmpty(Mono.just(Presence.ABSENT));
+
+            return imapView.zipWith(pop3View)
+                .map(t2 -> {
+                    if (t2.getT1() == Presence.ABSENT && t2.getT2() == Presence.PRESENT) {
+                        return this;
+                    }
+                    return NO_INCONSISTENCY;
+                });
+        }
+
+        private void notifyFailure(Context context, Throwable e) {
             context.addErrors(MessageInconsistenciesEntry.builder()
                 .mailboxId(mailboxId.serialize())
                 .messageId(messageId.serialize()));
-            LOGGER.error("Failed to fix inconsistency for stale POP3 entry: {}", messageId);
+            LOGGER.error("Failed to fix inconsistency for stale POP3 entry: {}", messageId, e);
         }
 
         private void notifySuccess(Context context) {
@@ -122,11 +156,30 @@ public class MetaDataFixInconsistenciesService {
                 .doOnSuccess(any -> notifySuccess(context))
                 .thenReturn(Task.Result.COMPLETED)
                 .onErrorResume(error -> {
-                    notifyFailure(context);
+                    notifyFailure(context, error);
                     return Mono.just(Task.Result.PARTIAL);
                 });
         }
 
+        @Override
+        public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+            Mono<Presence> imapView = imapUidDAO.retrieve(messageId, Optional.of((CassandraId) mailboxId))
+                .next()
+                .map(any -> Presence.PRESENT)
+                .switchIfEmpty(Mono.just(Presence.ABSENT));
+            Mono<Presence> pop3View = Mono.from(pop3MetadataStore.retrieve(mailboxId, messageId))
+                .map(any -> Presence.PRESENT)
+                .switchIfEmpty(Mono.just(Presence.ABSENT));
+
+            return imapView.zipWith(pop3View)
+                .map(t2 -> {
+                    if (t2.getT1() == Presence.PRESENT && t2.getT2() == Presence.ABSENT) {
+                        return this;
+                    }
+                    return NO_INCONSISTENCY;
+                });
+        }
+
         private Mono<Pop3MetadataStore.StatMetadata> buildStatMetadata() {
             return cassandraMessageDAOV3.retrieveMessage(messageId, FetchType.METADATA)
                 .switchIfEmpty(Mono.error(new MailboxException("Message not found: " + messageId)))
@@ -134,11 +187,11 @@ public class MetaDataFixInconsistenciesService {
         }
 
 
-        private void notifyFailure(Context context) {
+        private void notifyFailure(Context context, Throwable e) {
             context.addErrors(MessageInconsistenciesEntry.builder()
                 .mailboxId(mailboxId.serialize())
                 .messageId(messageId.serialize()));
-            LOGGER.error("Failed to fix inconsistency for missing POP3 entry: {}", messageId);
+            LOGGER.error("Failed to fix inconsistency for missing POP3 entry: {}", messageId, e);
         }
 
         private void notifySuccess(Context context) {
@@ -169,6 +222,11 @@ public class MetaDataFixInconsistenciesService {
             LOGGER.error("Failed to detect inconsistency: {}", messageId);
             return Mono.just(Task.Result.PARTIAL);
         }
+
+        @Override
+        public Mono<Inconsistency> confirm(CassandraMessageIdToImapUidDAO imapUidDAO, Pop3MetadataStore pop3MetadataStore) {
+            return Mono.just(this);
+        }
     }
 
     public static class Context {
@@ -422,6 +480,7 @@ public class MetaDataFixInconsistenciesService {
                 .per(PERIOD)
                 .forOperation(fullMetadata -> detectStaleEntriesInPop3MetaDataStore(fullMetadata)
                     .doOnNext(any -> context.incrementProcessedPop3MetaDataStoreEntries())
+                    .flatMap(inconsistency -> inconsistency.confirm(imapUidDAO, pop3MetadataStore))
                     .flatMap(inconsistency -> inconsistency.fix(context, imapUidDAO, pop3MetadataStore))));
     }
 
@@ -443,6 +502,7 @@ public class MetaDataFixInconsistenciesService {
                 .per(PERIOD)
                 .forOperation(metaData -> detectMissingEntriesInPop3MetaDataStore(metaData)
                     .doOnNext(any -> context.incrementProcessedImapUidEntries())
+                    .flatMap(inconsistency -> inconsistency.confirm(imapUidDAO, pop3MetadataStore))
                     .flatMap(inconsistency -> inconsistency.fix(context, imapUidDAO, pop3MetadataStore))));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org