You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/29 03:03:08 UTC

[james-project] 01/08: JAMES-3184 Throttle message process rate

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cd0783e6fe67f836d2ba866f6779bfef14c01d5c
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon May 18 17:29:01 2020 +0700

    JAMES-3184 Throttle message process rate
---
 .../task/SolveMessageInconsistenciesService.java   | 57 +++++++++++++----
 .../SolveMessageInconsistenciesServiceTest.java    | 73 +++++++++++-----------
 2 files changed, 81 insertions(+), 49 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index a052681..6106458 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail.task;
 
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
@@ -41,10 +42,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 public class SolveMessageInconsistenciesService {
 
@@ -162,6 +165,23 @@ public class SolveMessageInconsistenciesService {
         }
     }
 
+    public static class RunningOptions {
+
+        public static final RunningOptions DEFAULT = new RunningOptions(100);
+
+        private final int messagesPerSecond;
+
+        public RunningOptions(int messagesPerSecond) {
+            Preconditions.checkArgument(messagesPerSecond > 0, "'messagesPerSecond' must be strictly positive");
+
+            this.messagesPerSecond = messagesPerSecond;
+        }
+
+        public int getMessagesPerSecond() {
+            return this.messagesPerSecond;
+        }
+    }
+
     static class Context {
         static class Snapshot {
             public static Builder builder() {
@@ -332,8 +352,8 @@ public class SolveMessageInconsistenciesService {
         }
 
         private Context(AtomicLong processedImapUidEntries, AtomicLong processedMessageIdEntries, AtomicLong addedMessageIdEntries,
-                        AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries, Collection<ComposedMessageId> fixedInconsistencies,
-                        Collection<ComposedMessageId> errors) {
+                        AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries,
+                        Collection<ComposedMessageId> fixedInconsistencies, Collection<ComposedMessageId> errors) {
             this.processedImapUidEntries = processedImapUidEntries;
             this.processedMessageIdEntries = processedMessageIdEntries;
             this.addedMessageIdEntries = addedMessageIdEntries;
@@ -383,7 +403,9 @@ public class SolveMessageInconsistenciesService {
         }
     }
 
-    public static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
+    private static final Duration DELAY = Duration.ZERO;
+    private static final Duration PERIOD = Duration.ofSeconds(1);
 
     private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -394,18 +416,24 @@ public class SolveMessageInconsistenciesService {
         this.messageIdDAO = messageIdDAO;
     }
 
-    Mono<Task.Result> fixMessageInconsistencies(Context context) {
+    Mono<Task.Result> fixMessageInconsistencies(Context context, RunningOptions runningOptions) {
         return Flux.concat(
-                fixInconsistenciesInMessageId(context),
-                fixInconsistenciesInImapUid(context))
+                fixInconsistenciesInMessageId(context, runningOptions),
+                fixInconsistenciesInImapUid(context, runningOptions))
             .reduce(Task.Result.COMPLETED, Task::combine);
     }
 
-    private Flux<Task.Result> fixInconsistenciesInImapUid(Context context) {
-        return messageIdToImapUidDAO.retrieveAllMessages()
+    private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
+        return throttle(messageIdToImapUidDAO.retrieveAllMessages(), runningOptions)
             .doOnNext(any -> context.incrementProcessedImapUidEntries())
-            .concatMap(this::detectInconsistencyInImapUid)
-            .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+            .flatMap(this::detectInconsistencyInImapUid)
+            .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+    }
+
+    private Flux<ComposedMessageIdWithMetaData> throttle(Flux<ComposedMessageIdWithMetaData> messages, RunningOptions runningOptions) {
+        return messages.windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
+            .zipWith(Flux.interval(DELAY, PERIOD))
+            .flatMap(Tuple2::getT1);
     }
 
     private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
@@ -445,11 +473,14 @@ public class SolveMessageInconsistenciesService {
             .switchIfEmpty(Mono.just(NO_INCONSISTENCY));
     }
 
-    private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) {
+    private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
         return messageIdDAO.retrieveAllMessages()
             .doOnNext(any -> context.incrementMessageIdEntries())
-            .concatMap(this::detectInconsistencyInMessageId)
-            .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+            .windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
+            .zipWith(Flux.interval(Duration.ofSeconds(1)))
+            .flatMap(Tuple2::getT1)
+            .flatMap(this::detectInconsistencyInMessageId)
+            .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
     }
 
     private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
index 9b60935..086d105 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Context;
+import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.RunningOptions;
 import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
@@ -105,7 +106,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
     @Test
     void fixMessageInconsistenciesShouldReturnCompletedWhenNoData() {
-        assertThat(testee.fixMessageInconsistencies(new Context()).block())
+        assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
@@ -114,13 +115,13 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1).block();
 
-        assertThat(testee.fixMessageInconsistencies(new Context()).block())
+        assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
     @Test
     void fixMailboxInconsistenciesShouldNotAlterStateWhenEmpty() {
-        testee.fixMessageInconsistencies(new Context()).block();
+        testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()).isEmpty();
@@ -133,7 +134,7 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1).block();
 
-        testee.fixMessageInconsistencies(new Context()).block();
+        testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -150,7 +151,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
             imapUidDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies(new Context()).block())
+            assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -167,7 +168,7 @@ public class SolveMessageInconsistenciesServiceTest {
                     .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
 
             Context context = new Context();
-            Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+            Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
             task.subscribe();
 
             barrier.awaitCaller();
@@ -196,7 +197,7 @@ public class SolveMessageInconsistenciesServiceTest {
                     .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
 
             Context context = new Context();
-            Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+            Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
             task.subscribe();
 
             barrier.awaitCaller();
@@ -217,7 +218,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldResolveInconsistentData() {
             imapUidDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies(new Context()).block();
+            testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -232,7 +233,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
 
-            assertThat(testee.fixMessageInconsistencies(new Context()).block())
+            assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -241,7 +242,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
 
-            testee.fixMessageInconsistencies(new Context()).block();
+            testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -256,7 +257,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
 
-            assertThat(testee.fixMessageInconsistencies(new Context()).block())
+            assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -265,7 +266,7 @@ public class SolveMessageInconsistenciesServiceTest {
             imapUidDAO.insert(MESSAGE_1).block();
             messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
 
-            testee.fixMessageInconsistencies(new Context()).block();
+            testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -286,7 +287,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .forever()
                         .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                assertThat(testee.fixMessageInconsistencies(new Context()).block())
+                assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -300,7 +301,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                assertThat(testee.fixMessageInconsistencies(new Context()).block())
+                assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -314,7 +315,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,d2bee791-7e63-11ea-883c-95b84008f979,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
 
-                testee.fixMessageInconsistencies(new Context()).block();
+                testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_2, Optional.of(MAILBOX_ID)).collectList().block())
@@ -325,7 +326,7 @@ public class SolveMessageInconsistenciesServiceTest {
             }
 
             @Test
-            void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) {
+            void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) {
                 Context context = new Context();
 
                 imapUidDAO.insert(MESSAGE_1).block();
@@ -335,7 +336,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
 
-                testee.fixMessageInconsistencies(context).block();
+                testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
                 assertThat(context.snapshot())
                     .isEqualTo(Context.Snapshot.builder()
@@ -345,7 +346,7 @@ public class SolveMessageInconsistenciesServiceTest {
             }
 
             @Test
-            void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) {
+            void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) {
                 Context context = new Context();
 
                 imapUidDAO.insert(MESSAGE_1).block();
@@ -355,7 +356,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
 
-                testee.fixMessageInconsistencies(context).block();
+                testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
                 assertThat(context.snapshot())
                     .isEqualTo(Context.Snapshot.builder()
@@ -373,7 +374,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
             messageIdDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies(new Context()).block())
+            assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -391,7 +392,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         "WHERE mailboxId=:mailboxId AND uid=:uid;"));
 
             Context context = new Context();
-            Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+            Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
             task.subscribe();
 
             barrier.awaitCaller();
@@ -412,7 +413,7 @@ public class SolveMessageInconsistenciesServiceTest {
         void fixMessageInconsistenciesShouldResolveInconsistentData() {
             messageIdDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies(new Context()).block();
+            testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -429,7 +430,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
             imapUidDAO.insert(MESSAGE_1).block();
 
-            assertThat(testee.fixMessageInconsistencies(new Context()).block())
+            assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                 .isEqualTo(Task.Result.COMPLETED);
         }
 
@@ -440,7 +441,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
             imapUidDAO.insert(MESSAGE_1).block();
 
-            testee.fixMessageInconsistencies(new Context()).block();
+            testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -461,7 +462,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .forever()
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
 
-                assertThat(testee.fixMessageInconsistencies(new Context()).block())
+                assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -475,7 +476,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
 
-                assertThat(testee.fixMessageInconsistencies(new Context()).block())
+                assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
                     .isEqualTo(Task.Result.PARTIAL);
             }
 
@@ -489,7 +490,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
 
-                testee.fixMessageInconsistencies(new Context()).block();
+                testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
 
                 SoftAssertions.assertSoftly(softly -> {
                     softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -510,7 +511,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
 
-                testee.fixMessageInconsistencies(context).block();
+                testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
                 assertThat(context.snapshot())
                     .isEqualTo(Context.Snapshot.builder()
@@ -530,7 +531,7 @@ public class SolveMessageInconsistenciesServiceTest {
                         .times(1)
                         .whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
 
-                testee.fixMessageInconsistencies(context).block();
+                testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
                 assertThat(context.snapshot())
                     .isEqualTo(Context.Snapshot.builder()
@@ -545,7 +546,7 @@ public class SolveMessageInconsistenciesServiceTest {
     void fixMailboxInconsistenciesShouldNotUpdateContextWhenNoData() {
         Context context = new Context();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new Context().snapshot());
     }
@@ -557,7 +558,7 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1).block();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()
@@ -572,7 +573,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
         imapUidDAO.insert(MESSAGE_1).block();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()
@@ -589,7 +590,7 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()
@@ -607,7 +608,7 @@ public class SolveMessageInconsistenciesServiceTest {
         imapUidDAO.insert(MESSAGE_1).block();
         messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()
@@ -624,7 +625,7 @@ public class SolveMessageInconsistenciesServiceTest {
 
         messageIdDAO.insert(MESSAGE_1).block();
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()
@@ -645,7 +646,7 @@ public class SolveMessageInconsistenciesServiceTest {
                 .times(1)
                 .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
 
-        testee.fixMessageInconsistencies(context).block();
+        testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(Context.Snapshot.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org