You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/29 03:03:08 UTC
[james-project] 01/08: JAMES-3184 Throttle message process rate
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cd0783e6fe67f836d2ba866f6779bfef14c01d5c
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Mon May 18 17:29:01 2020 +0700
JAMES-3184 Throttle message process rate
---
.../task/SolveMessageInconsistenciesService.java | 57 +++++++++++++----
.../SolveMessageInconsistenciesServiceTest.java | 73 +++++++++++-----------
2 files changed, 81 insertions(+), 49 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index a052681..6106458 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail.task;
import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
@@ -41,10 +42,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
public class SolveMessageInconsistenciesService {
@@ -162,6 +165,23 @@ public class SolveMessageInconsistenciesService {
}
}
+ public static class RunningOptions {
+
+ public static final RunningOptions DEFAULT = new RunningOptions(100);
+
+ private final int messagesPerSecond;
+
+ public RunningOptions(int messagesPerSecond) {
+ Preconditions.checkArgument(messagesPerSecond > 0, "'messagesPerSecond' must be strictly positive");
+
+ this.messagesPerSecond = messagesPerSecond;
+ }
+
+ public int getMessagesPerSecond() {
+ return this.messagesPerSecond;
+ }
+ }
+
static class Context {
static class Snapshot {
public static Builder builder() {
@@ -332,8 +352,8 @@ public class SolveMessageInconsistenciesService {
}
private Context(AtomicLong processedImapUidEntries, AtomicLong processedMessageIdEntries, AtomicLong addedMessageIdEntries,
- AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries, Collection<ComposedMessageId> fixedInconsistencies,
- Collection<ComposedMessageId> errors) {
+ AtomicLong updatedMessageIdEntries, AtomicLong removedMessageIdEntries,
+ Collection<ComposedMessageId> fixedInconsistencies, Collection<ComposedMessageId> errors) {
this.processedImapUidEntries = processedImapUidEntries;
this.processedMessageIdEntries = processedMessageIdEntries;
this.addedMessageIdEntries = addedMessageIdEntries;
@@ -383,7 +403,9 @@ public class SolveMessageInconsistenciesService {
}
}
- public static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SolveMessageInconsistenciesService.class);
+ private static final Duration DELAY = Duration.ZERO;
+ private static final Duration PERIOD = Duration.ofSeconds(1);
private final CassandraMessageIdToImapUidDAO messageIdToImapUidDAO;
private final CassandraMessageIdDAO messageIdDAO;
@@ -394,18 +416,24 @@ public class SolveMessageInconsistenciesService {
this.messageIdDAO = messageIdDAO;
}
- Mono<Task.Result> fixMessageInconsistencies(Context context) {
+ Mono<Task.Result> fixMessageInconsistencies(Context context, RunningOptions runningOptions) {
return Flux.concat(
- fixInconsistenciesInMessageId(context),
- fixInconsistenciesInImapUid(context))
+ fixInconsistenciesInMessageId(context, runningOptions),
+ fixInconsistenciesInImapUid(context, runningOptions))
.reduce(Task.Result.COMPLETED, Task::combine);
}
- private Flux<Task.Result> fixInconsistenciesInImapUid(Context context) {
- return messageIdToImapUidDAO.retrieveAllMessages()
+ private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, RunningOptions runningOptions) {
+ return throttle(messageIdToImapUidDAO.retrieveAllMessages(), runningOptions)
.doOnNext(any -> context.incrementProcessedImapUidEntries())
- .concatMap(this::detectInconsistencyInImapUid)
- .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+ .flatMap(this::detectInconsistencyInImapUid)
+ .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+ }
+
+ private Flux<ComposedMessageIdWithMetaData> throttle(Flux<ComposedMessageIdWithMetaData> messages, RunningOptions runningOptions) {
+ return messages.windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
+ .zipWith(Flux.interval(DELAY, PERIOD))
+ .flatMap(Tuple2::getT1);
}
private Mono<Inconsistency> detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
@@ -445,11 +473,14 @@ public class SolveMessageInconsistenciesService {
.switchIfEmpty(Mono.just(NO_INCONSISTENCY));
}
- private Flux<Task.Result> fixInconsistenciesInMessageId(Context context) {
+ private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, RunningOptions runningOptions) {
return messageIdDAO.retrieveAllMessages()
.doOnNext(any -> context.incrementMessageIdEntries())
- .concatMap(this::detectInconsistencyInMessageId)
- .concatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
+ .windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1))
+ .zipWith(Flux.interval(Duration.ofSeconds(1)))
+ .flatMap(Tuple2::getT1)
+ .flatMap(this::detectInconsistencyInMessageId)
+ .flatMap(inconsistency -> inconsistency.fix(context, messageIdToImapUidDAO, messageIdDAO));
}
private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
index 9b60935..086d105 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.Context;
+import org.apache.james.mailbox.cassandra.mail.task.SolveMessageInconsistenciesService.RunningOptions;
import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
@@ -105,7 +106,7 @@ public class SolveMessageInconsistenciesServiceTest {
@Test
void fixMessageInconsistenciesShouldReturnCompletedWhenNoData() {
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -114,13 +115,13 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@Test
void fixMailboxInconsistenciesShouldNotAlterStateWhenEmpty() {
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block()).isEmpty();
@@ -133,7 +134,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -150,7 +151,7 @@ public class SolveMessageInconsistenciesServiceTest {
void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
imapUidDAO.insert(MESSAGE_1).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -167,7 +168,7 @@ public class SolveMessageInconsistenciesServiceTest {
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
Context context = new Context();
- Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+ Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
task.subscribe();
barrier.awaitCaller();
@@ -196,7 +197,7 @@ public class SolveMessageInconsistenciesServiceTest {
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
Context context = new Context();
- Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+ Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
task.subscribe();
barrier.awaitCaller();
@@ -217,7 +218,7 @@ public class SolveMessageInconsistenciesServiceTest {
void fixMessageInconsistenciesShouldResolveInconsistentData() {
imapUidDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -232,7 +233,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -241,7 +242,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -256,7 +257,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -265,7 +266,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_1, Optional.of(MAILBOX_ID)).collectList().block())
@@ -286,7 +287,7 @@ public class SolveMessageInconsistenciesServiceTest {
.forever()
.whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.PARTIAL);
}
@@ -300,7 +301,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,:messageId,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.PARTIAL);
}
@@ -314,7 +315,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("INSERT INTO messageIdTable (mailboxId,uid,modSeq,messageId,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags) VALUES (:mailboxId,:uid,:modSeq,d2bee791-7e63-11ea-883c-95b84008f979,:flagAnswered,:flagDeleted,:flagDraft,:flagFlagged,:flagRecent,:flagSeen,:flagUser,:userFlags)"));
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieve(MESSAGE_ID_2, Optional.of(MAILBOX_ID)).collectList().block())
@@ -325,7 +326,7 @@ public class SolveMessageInconsistenciesServiceTest {
}
@Test
- void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) {
+ void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveImapUidRecord(CassandraCluster cassandra) {
Context context = new Context();
imapUidDAO.insert(MESSAGE_1).block();
@@ -335,7 +336,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -345,7 +346,7 @@ public class SolveMessageInconsistenciesServiceTest {
}
@Test
- void fixMailboxInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) {
+ void fixMessageInconsistenciesShouldUpdateContextWhenFailedToRetrieveMessageIdRecord(CassandraCluster cassandra) {
Context context = new Context();
imapUidDAO.insert(MESSAGE_1).block();
@@ -355,7 +356,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -373,7 +374,7 @@ public class SolveMessageInconsistenciesServiceTest {
void fixMessageInconsistenciesShouldReturnCompletedWhenInconsistentData() {
messageIdDAO.insert(MESSAGE_1).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -391,7 +392,7 @@ public class SolveMessageInconsistenciesServiceTest {
"WHERE mailboxId=:mailboxId AND uid=:uid;"));
Context context = new Context();
- Mono<Task.Result> task = testee.fixMessageInconsistencies(context).subscribeOn(Schedulers.elastic()).cache();
+ Mono<Task.Result> task = testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).subscribeOn(Schedulers.elastic()).cache();
task.subscribe();
barrier.awaitCaller();
@@ -412,7 +413,7 @@ public class SolveMessageInconsistenciesServiceTest {
void fixMessageInconsistenciesShouldResolveInconsistentData() {
messageIdDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -429,7 +430,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.COMPLETED);
}
@@ -440,7 +441,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -461,7 +462,7 @@ public class SolveMessageInconsistenciesServiceTest {
.forever()
.whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.PARTIAL);
}
@@ -475,7 +476,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
- assertThat(testee.fixMessageInconsistencies(new Context()).block())
+ assertThat(testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block())
.isEqualTo(Task.Result.PARTIAL);
}
@@ -489,7 +490,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
- testee.fixMessageInconsistencies(new Context()).block();
+ testee.fixMessageInconsistencies(new Context(), RunningOptions.DEFAULT).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(imapUidDAO.retrieveAllMessages().collectList().block())
@@ -510,7 +511,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid"));
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -530,7 +531,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable WHERE messageId=:messageId AND mailboxId=:mailboxId"));
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -545,7 +546,7 @@ public class SolveMessageInconsistenciesServiceTest {
void fixMailboxInconsistenciesShouldNotUpdateContextWhenNoData() {
Context context = new Context();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new Context().snapshot());
}
@@ -557,7 +558,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -572,7 +573,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -589,7 +590,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_MOD_SEQ_2).block();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -607,7 +608,7 @@ public class SolveMessageInconsistenciesServiceTest {
imapUidDAO.insert(MESSAGE_1).block();
messageIdDAO.insert(MESSAGE_1_WITH_SEEN_FLAG).block();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -624,7 +625,7 @@ public class SolveMessageInconsistenciesServiceTest {
messageIdDAO.insert(MESSAGE_1).block();
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
@@ -645,7 +646,7 @@ public class SolveMessageInconsistenciesServiceTest {
.times(1)
.whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
- testee.fixMessageInconsistencies(context).block();
+ testee.fixMessageInconsistencies(context, RunningOptions.DEFAULT).block();
assertThat(context.snapshot())
.isEqualTo(Context.Snapshot.builder()
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org