You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:03:55 UTC

[james-project] branch master updated (f9f5514 -> ccc5e48)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from f9f5514  JAMES-3212 Removed jetbrains annotations with no dependency
     new 3728fae  JAMES-3150 Add debugging support for the garbage collection
     new 7ef2da7  JAMES-3150 unify play-json dependency version
     new 5223223  JAMES-3150 change wording of delete to dereference in json serialization format
     new d92d2df  [Enhancement] Glowroot per mailet/matcher transaction recording
     new deae28f  [Enhancement] Glowroot per JMAP method transaction recording
     new 0345c32  JAMES-3289 fix flacky tests
     new 9f514be  JAMES-3184 Throttling should survive errors
     new bddf322  JAMES-3184 Throttling should work for long running streams
     new f777476  JAMES-3184 Throttling should tolerate many empty windows
     new 6a8e0d4  JAMES-3184 Throttling should survive errors
     new 8c6bf81  JAMES-3201 Add a missing task::await to improve test stability
     new c6a3ecb  JAMES-3290 Reactify MailLoader to handle error propagation
     new ce9d7ba  JAMES-3184 Add serialized info for reindexing tasks RunningOptions
     new 72e489a  JAMES-3265 Fix CassandraMessageMapper::deleteMessages mailbox counters handling
     new ccc5e48  JAMES-3265 Leverage default method to offer a conviant single message flag update

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/backends/cassandra/Scenario.java  |   8 +
 .../destination/glowroot/plugins/jmap.json         |   2 +-
 .../destination/glowroot/plugins/spooler.json      |   4 +-
 .../apache/james/mailbox/indexer/ReIndexer.java    |   4 -
 .../cassandra/mail/CassandraIndexTableHandler.java |   6 +-
 mailbox/event/json/pom.xml                         |   1 -
 .../james/mailbox/store/mail/MessageMapper.java    |   5 +
 .../store/mail/model/MessageMapperTest.java        | 151 +++++++------
 server/blob/blob-deduplicating/pom.xml             |   4 +
 .../james/server/blob/deduplication/GC.scala       |  12 +-
 .../server/blob/deduplication/GCJsonReporter.scala | 170 ++++++++++++++
 .../src/test/resources/gcReport.json               |  63 ++++++
 .../blob/deduplication/GCJsonReporterTest.scala    | 190 ++++++++++++++++
 .../blob/deduplication/GCPropertiesTest.scala      |  38 +---
 .../james/server/blob/deduplication/State.scala    |  37 +++
 server/container/util/pom.xml                      |   5 +
 .../java/org/apache/james/util/ReactorUtils.java   |  19 +-
 .../org/apache/james/util/ReactorUtilsTest.java    | 248 ++++++++++++++++++++-
 .../WebAdminReprocessingContextInformationDTO.java |  21 +-
 ...lboxReindexingTaskAdditionalInformationDTO.java |   9 +-
 ...UserReindexingTaskAdditionalInformationDTO.java |   8 +
 ...AdminReprocessingContextInformationDTOTest.java |   2 +
 .../WebAdminSingleMailboxReprocessingDTOTest.java  |   1 +
 .../dto/WebAdminUserReprocessingDTOTest.java       |   1 +
 .../james/webadmin/routes/MailboxesRoutesTest.java | 140 ++++++++++++
 .../webadmin/routes/UserMailboxesRoutesTest.java   |  43 ++++
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  31 +--
 .../apache/james/queue/rabbitmq/MailLoader.java    |  39 ++--
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |   6 +-
 .../james/queue/rabbitmq/MailLoaderTest.java       |  47 ++--
 30 files changed, 1120 insertions(+), 195 deletions(-)
 create mode 100644 server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
 create mode 100644 server/blob/blob-deduplicating/src/test/resources/gcReport.json
 create mode 100644 server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
 copy mailbox/store/src/test/java/org/apache/james/mailbox/store/MailboxReactorUtilsTest.java => server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java (56%)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/15: JAMES-3289 fix flacky tests

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0345c32c2e7f56dcada0a378221a935a4c904ec7
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Tue Jun 30 10:23:41 2020 +0200

    JAMES-3289 fix flacky tests
    
    add a delay in the throwing behavior to avoid the reactor bug defined in https://github.com/reactor/reactor-core/issues/1941
    it is the source of instability in our tests, as in the case of a first failing first publisher in a merge operation,
     its error would not be propagated and the result of the merge will be a successful Publisher instead of the expected failing one.
---
 .../test/java/org/apache/james/backends/cassandra/Scenario.java   | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
index 9acabc2..f5ddbf7 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
@@ -41,6 +41,14 @@ public class Scenario {
     @FunctionalInterface
     interface Behavior {
         Behavior THROW = (session, statement) -> {
+            //JAMES-3289 add a delay in the throwing behavior to avoid the reactor bug defined in https://github.com/reactor/reactor-core/issues/1941
+            //which cause flacky tests.
+            //once this bug is solved this delay could be removed.
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException e) {
+                //DO NOTHING
+            }
             throw new InjectedFailureException();
         };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 10/15: JAMES-3184 Throttling should survive errors

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6a8e0d402d3d91727bc34c2bd2719b3aeab0a87b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 15:29:44 2020 +0700

    JAMES-3184 Throttling should survive errors
    
    fixup! JAMES-3184 Throttling should survive errors
---
 .../main/java/org/apache/james/util/ReactorUtils.java |  8 ++++++--
 .../java/org/apache/james/util/ReactorUtilsTest.java  | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 5ec7b9d..b516b8e 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -52,10 +52,14 @@ public class ReactorUtils {
             Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
 
             return flux -> flux
+                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while generating throttled entries", e))
                 .window(elements)
                 .delayElements(duration)
-                .concatMap(window -> window.flatMap(operation))
-                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
+                .concatMap(window -> window.flatMap(operation)
+                    .onErrorResume(e -> {
+                        LOGGER.error("Error encountered while throttling", e);
+                        return Mono.empty();
+                    }));
         };
     }
 
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 0ffbc1a..f7fc589 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -209,6 +209,25 @@ class ReactorUtilsTest {
         }
 
         @Test
+        void throttleShouldNotOverwriteErrorHandling() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.just(0L);
+            ConcurrentLinkedDeque<Throwable> recordedExceptions = new ConcurrentLinkedDeque<>();
+
+            originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(any -> Mono.<Long>error(new RuntimeException())
+                        .onErrorResume(e -> Mono.fromRunnable(() -> recordedExceptions.add(e)).thenReturn(any))))
+                .blockLast();
+
+            assertThat(recordedExceptions).hasSize(1);
+        }
+
+        @Test
         void throttleShouldHandleLargeFluxes() {
             int windowMaxSize = 2;
             Duration windowDuration = Duration.ofMillis(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/15: [Enhancement] Glowroot per JMAP method transaction recording

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit deae28f489078e4c1c22af1edb11deba54ed0e1b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 29 17:55:21 2020 +0700

    [Enhancement] Glowroot per JMAP method transaction recording
    
    Today they are recorded as part of "/jmap" transaction and do not allow
    per method extensive diagnostic. Traces are recorded but this limits overall
    statistic.
    
    We will gain significative debugging insight from recording a new transaction...
---
 .../run/guice/cassandra-rabbitmq/destination/glowroot/plugins/jmap.json | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/jmap.json b/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/jmap.json
index 9afce4b..b5a0ea1 100644
--- a/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/jmap.json
+++ b/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/jmap.json
@@ -11,7 +11,7 @@
       "captureKind": "transaction",
       "transactionType": "JMAP",
       "transactionNameTemplate": "JMAP method : {{this.class.name}}",
-      "alreadyInTransactionBehavior": "capture-trace-entry",
+      "alreadyInTransactionBehavior": "capture-new-transaction",
       "traceEntryMessageTemplate": "{{this.class.name}}.{{methodName}}",
       "timerName": "jmapMethod"
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/15: [Enhancement] Glowroot per mailet/matcher transaction recording

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d92d2df47ced365fc9116ba49d417ec492c6682b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 29 17:54:31 2020 +0700

    [Enhancement] Glowroot per mailet/matcher transaction recording
    
    Today they are recorded as part of "mailet container" transaction and do not allow
    per mailet/matcher extensive diagnostic. Traces are recorded but this limits overall
    statistic.
    
    We will gain significative debugging insight from recording a new transaction...
---
 .../cassandra-rabbitmq/destination/glowroot/plugins/spooler.json      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/spooler.json b/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/spooler.json
index fd7732d..5937ca5 100644
--- a/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/spooler.json
+++ b/dockerfiles/run/guice/cassandra-rabbitmq/destination/glowroot/plugins/spooler.json
@@ -24,7 +24,7 @@
       "captureKind": "transaction",
       "transactionType": "Mailet",
       "transactionNameTemplate": "Mailet : {{this.class.name}}",
-      "alreadyInTransactionBehavior": "capture-trace-entry",
+      "alreadyInTransactionBehavior": "capture-new-transaction",
       "traceEntryMessageTemplate": "{{this.class.name}}.{{methodName}}",
       "timerName": "mailet"
     },
@@ -37,7 +37,7 @@
       "captureKind": "transaction",
       "transactionType": "Matcher",
       "transactionNameTemplate": "Mailet processor : {{this.class.name}}",
-      "alreadyInTransactionBehavior": "capture-trace-entry",
+      "alreadyInTransactionBehavior": "capture-new-transaction",
       "traceEntryMessageTemplate": "{{this.class.name}}.{{methodName}}",
       "timerName": "matcher"
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 14/15: JAMES-3265 Fix CassandraMessageMapper::deleteMessages mailbox counters handling

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 72e489a56a899a26a84f8429ab373670dbf7612c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 13:27:31 2020 +0700

    JAMES-3265 Fix CassandraMessageMapper::deleteMessages mailbox counters handling
    
    This code path side effect was untested
---
 .../cassandra/mail/CassandraIndexTableHandler.java     |  6 +++---
 .../mailbox/store/mail/model/MessageMapperTest.java    | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 47fd1eb..c98112e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -159,15 +159,15 @@ public class CassandraIndexTableHandler {
     }
 
     private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
-        long seenCount = metaData.stream()
+        long unseenCount = metaData.stream()
             .map(MessageMetaData::getFlags)
-            .filter(flags -> flags.contains(Flags.Flag.SEEN))
+            .filter(flags -> !flags.contains(Flags.Flag.SEEN))
             .count();
 
         return mailboxCounterDAO.remove(MailboxCounters.builder()
             .mailboxId(mailboxId)
             .count(metaData.size())
-            .unseen(seenCount)
+            .unseen(unseenCount)
             .build());
     }
 
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 1d44644..0d396a2 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -43,6 +43,7 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxCounters;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
@@ -378,6 +379,23 @@ public abstract class MessageMapperTest {
     }
 
     @Test
+    void deleteMessagesShouldDecrementUnseenToOneWhenDeletingTwoUnseenMessagesOutOfThree() throws MailboxException {
+        saveMessages();
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message3.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+
+        messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+        assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox))
+            .isEqualTo(MailboxCounters.builder()
+                .mailboxId(benwaInboxMailbox.getMailboxId())
+                .count(2)
+                .unseen(1)
+                .build());
+    }
+
+    @Test
     void addShouldUpdateRecentWhenNeeded() throws MailboxException {
         message1.setFlags(new Flags(Flags.Flag.RECENT));
         messageMapper.add(benwaInboxMailbox, message1);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/15: JAMES-3150 change wording of delete to dereference in json serialization format

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 52232238f5ba060240d354342e1949df019d5ab9
Author: Rémi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Jun 22 15:41:32 2020 +0200

    JAMES-3150 change wording of delete to dereference in json serialization format
---
 .../server/blob/deduplication/GCJsonReporter.scala | 20 +++----
 .../src/test/resources/gcReport.json               | 12 ++--
 .../blob/deduplication/GCJsonReporterTest.scala    | 66 +++++++++++-----------
 3 files changed, 49 insertions(+), 49 deletions(-)

diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
index a37ec36..9f68ef8 100644
--- a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
+++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
@@ -19,7 +19,7 @@
 package org.apache.james.server.blob.deduplication
 
 import org.apache.james.blob.api.BlobId
-import org.apache.james.server.blob.deduplication.RelatedAction.{Delete, GarbageCollect, Init, Save}
+import org.apache.james.server.blob.deduplication.RelatedAction.{GarbageCollect, Init, Save}
 import play.api.libs.json.{JsString, Json, Writes}
 
 import scala.collection.immutable.TreeSet
@@ -29,7 +29,7 @@ sealed trait RelatedAction
 object RelatedAction {
   case object Init extends RelatedAction
   case class Save(blobId: BlobId, reference: ExternalID) extends RelatedAction
-  case class Delete(reference: ExternalID) extends RelatedAction
+  case class Dereference(reference: ExternalID) extends RelatedAction
   case object GarbageCollect extends RelatedAction
 }
 
@@ -44,14 +44,14 @@ object JsonReport {
                    `garbage-collection-iterations`: TreeSet[String],
                    blobs: Seq[BlobId],
                    references: Seq[Reference],
-                   deletions: Seq[Dereference])
+                   dereferences: Seq[Dereference])
 
 
   //action
   implicit val relatedActionWrites: Writes[RelatedAction] = {
     case Init => JsString("init")
     case Save(blobId, reference) => JsString(s"save(blob = ${blobId.asString()}, reference = ${reference.id})")
-    case Delete(reference) => JsString(s"delete(reference = ${reference.id})")
+    case RelatedAction.Dereference(reference) => JsString(s"dereference(reference = ${reference.id})")
     case GarbageCollect => JsString(s"garbageCollect")
   }
   //generation
@@ -134,7 +134,7 @@ object GCJsonReporter {
       `garbage-collection-iterations` = lastState.`garbage-collection-iterations` + gcReports.iteration.asString,
       blobs = lastState.blobs.diff(gcReports.blobsToDelete.map { case (generation, blobId) => JsonReport.BlobId(blobId.asString, generation) }.toSeq),
       references = lastState.references.filterNot(reference => blobsToDeleteAsString.contains(reference.blob)),
-      deletions = lastState.deletions.filterNot(dereference => blobsToDeleteAsString.contains(dereference.blob)))
+      dereferences = lastState.dereferences.filterNot(dereference => blobsToDeleteAsString.contains(dereference.blob)))
   }
 
   private def stateForDereference(reportStates: JsonReport, dereference: Dereference) = {
@@ -143,14 +143,14 @@ object GCJsonReporter {
     val iterations = previousState.`garbage-collection-iterations`
     val references = previousState.references
     val lastIteration = previousState.`garbage-collection-iterations`.last
-    val dereferences = previousState.deletions :+ JsonReport.Dereference(dereference.blob.asString(), dereference.generation, lastIteration)
+    val dereferences = previousState.dereferences :+ JsonReport.Dereference(dereference.blob.asString(), dereference.generation, lastIteration)
 
-    JsonReport.State(Delete(dereference.externalId),
+    JsonReport.State(RelatedAction.Dereference(dereference.externalId),
       `reference-generations` = referenceGenerations,
       `garbage-collection-iterations` = iterations,
       blobs = previousState.blobs,
       references = references,
-      deletions = dereferences)
+      dereferences = dereferences)
   }
 
   private def stateForReference(reportStates: JsonReport, add: Reference) = {
@@ -163,8 +163,8 @@ object GCJsonReporter {
     else
       previousState.blobs :+ JsonReport.BlobId(add.blobId.asString(), add.generation)
     val references = previousState.references :+ JsonReport.Reference(add.externalId.id, add.blobId.asString(), add.generation)
-    val deletions = previousState.deletions
+    val dereferences = previousState.dereferences
 
-    JsonReport.State(Save(add.blobId, add.externalId), referenceGenerations, iterations, blobs, references, deletions)
+    JsonReport.State(Save(add.blobId, add.externalId), referenceGenerations, iterations, blobs, references, dereferences)
   }
 }
diff --git a/server/blob/blob-deduplicating/src/test/resources/gcReport.json b/server/blob/blob-deduplicating/src/test/resources/gcReport.json
index 8c87a57..7d4ae89 100644
--- a/server/blob/blob-deduplicating/src/test/resources/gcReport.json
+++ b/server/blob/blob-deduplicating/src/test/resources/gcReport.json
@@ -5,7 +5,7 @@
     "garbage-collection-iterations" : [ "0" ],
     "blobs" : [ ],
     "references" : [ ],
-    "deletions" : [ ]
+    "dereferences" : [ ]
   }, {
     "related-action" : "save(blob = 0_myHash, reference = message1)",
     "reference-generations" : [ "0" ],
@@ -19,7 +19,7 @@
       "blob" : "0_myHash",
       "reference-generation" : "0"
     } ],
-    "deletions" : [ ]
+    "dereferences" : [ ]
   }, {
     "related-action" : "garbageCollect",
     "reference-generations" : [ "0" ],
@@ -33,9 +33,9 @@
       "blob" : "0_myHash",
       "reference-generation" : "0"
     } ],
-    "deletions" : [ ]
+    "dereferences" : [ ]
   }, {
-    "related-action" : "delete(reference = message1)",
+    "related-action" : "dereference(reference = message1)",
     "reference-generations" : [ "0", "1" ],
     "garbage-collection-iterations" : [ "0", "1" ],
     "blobs" : [ {
@@ -47,7 +47,7 @@
       "blob" : "0_myHash",
       "reference-generation" : "0"
     } ],
-    "deletions" : [ {
+    "dereferences" : [ {
       "blob" : "0_myHash",
       "reference-generation" : "1",
       "garbage-collection-iterations" : "1"
@@ -58,6 +58,6 @@
     "garbage-collection-iterations" : [ "0", "1", "2" ],
     "blobs" : [ ],
     "references" : [ ],
-    "deletions" : [ ]
+    "dereferences" : [ ]
   } ]
 }
\ No newline at end of file
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
index c5961c2..6988e00 100644
--- a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
@@ -19,9 +19,7 @@
 
 package org.apache.james.server.blob.deduplication
 
-import java.time.Instant
-
-import org.apache.james.server.blob.deduplication.RelatedAction.{Delete, GarbageCollect, Init, Save}
+import org.apache.james.server.blob.deduplication.RelatedAction.{GarbageCollect, Init, Save}
 import org.apache.james.util.ClassLoaderUtils
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
@@ -42,19 +40,19 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
       `garbage-collection-iterations` = TreeSet(initialIteration),
       blobs = Seq[JsonReport.BlobId](),
       references = Nil,
-      deletions = Nil)
+      dereferences = Nil)
     val firstSaveReport = JsonReport.State(Save(blobId, externalId),
       `reference-generations` = TreeSet(generation),
       `garbage-collection-iterations` = TreeSet(initialIteration),
       blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
       references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-      deletions = Nil)
-    val firstDeleteReport = JsonReport.State(Delete(externalId),
+      dereferences = Nil)
+    val firstDeleteReport = JsonReport.State(RelatedAction.Dereference(externalId),
       `reference-generations` = TreeSet(generation),
       `garbage-collection-iterations` = TreeSet(initialIteration),
       blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
       references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-      deletions = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))
+      dereferences = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))
 
     val iterationForImmediateGC = Iteration(1L, Set(), generation)
     val gcReportImmediate = GCIterationReport(iterationForImmediateGC, Set())
@@ -63,13 +61,16 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
       "on initial state" in {
         GCJsonReporter
           .report(GCIterationEvent(gcReportImmediate) :: Nil)
-          .states should be (Seq(initialReport,
-          JsonReport.State(GarbageCollect,
-          `reference-generations` = TreeSet(Generation.first),
-          `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
-          blobs = Seq[JsonReport.BlobId](),
-          references = Nil,
-          deletions = Nil)))
+          .states should be(
+          Seq(
+            initialReport,
+            JsonReport.State(
+              GarbageCollect,
+              `reference-generations` = TreeSet(Generation.first),
+              `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+              blobs = Seq[JsonReport.BlobId](),
+              references = Nil,
+              dereferences = Nil)))
       }
     }
 
@@ -77,21 +78,21 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
       "one reference is added" in {
         GCJsonReporter
           .report(ReferenceEvent(Reference(externalId, blobId, generation)) :: GCIterationEvent(gcReportImmediate) :: Nil)
-          .states should be (Seq(
+          .states should be(Seq(
           initialReport,
           firstSaveReport,
           JsonReport.State(GarbageCollect,
-          `reference-generations` = TreeSet(generation),
-          `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
-          blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
-          references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-          deletions = Nil )))
+            `reference-generations` = TreeSet(generation),
+            `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+            blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+            references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+            dereferences = Nil)))
       }
 
       "one reference is added then removed" in {
         val reference = Reference(externalId, blobId, generation)
         GCJsonReporter.report(ReferenceEvent(reference) :: DereferenceEvent(Dereference(generation, reference)) :: GCIterationEvent(gcReportImmediate) :: Nil)
-          .states should be (Seq(
+          .states should be(Seq(
           initialReport,
           firstSaveReport,
           firstDeleteReport,
@@ -100,20 +101,20 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
             `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
             blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
             references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-            deletions = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))))
+            dereferences = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))))
       }
     }
 
     "GC has been ran" when {
       "report added and removed references" when {
-        "one reference is added then removed and the GC is ran 2 generation later" in {
+        "one reference is added then removed and the GC is run 2 generations later" in {
           val reference = Reference(externalId, blobId, generation)
           val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map(generation -> List(Dereference(generation, reference)))),
             lastIteration = Iteration.initial,
             targetedGeneration = generation.next(2))
 
           GCJsonReporter.report(ReferenceEvent(reference) :: DereferenceEvent(Dereference(generation, reference)) :: GCIterationEvent(gcReportGenNPlus2) :: Nil)
-            .states should be (Seq(
+            .states should be(Seq(
             initialReport,
             firstSaveReport,
             firstDeleteReport,
@@ -122,10 +123,10 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
               `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
               blobs = Nil,
               references = Nil,
-              deletions = Nil )))
+              dereferences = Nil)))
         }
 
-        "one reference is added, a gc run two generation later, then  it is removed and the GC is ran again" in {
+        "one reference is added, a gc run two generations later, then it is removed and the GC is run again" in {
           val reference = Reference(externalId, blobId, generation)
           val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map.empty),
             lastIteration = Iteration.initial,
@@ -138,7 +139,7 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
             targetedGeneration = generationPlusOne.next(2))
 
           GCJsonReporter.report(ReferenceEvent(reference) :: GCIterationEvent(gcReportGenNPlus2) :: DereferenceEvent(dereference) :: GCIterationEvent(gcReportGenNPlus3) :: Nil)
-            .states should be (Seq(
+            .states should be(Seq(
             initialReport,
             firstSaveReport,
             //first gc
@@ -147,32 +148,31 @@ class GCJsonReporterTest extends AnyWordSpec with Matchers {
               `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
               blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
               references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-              deletions = Nil),
+              dereferences = Nil),
             //delete
-            JsonReport.State(Delete(externalId),
+            JsonReport.State(RelatedAction.Dereference(externalId),
               `reference-generations` = TreeSet(generation, generationPlusOne),
               `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
               blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
               references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
-              deletions = Seq(JsonReport.Dereference(blobId.asString(), generationPlusOne, gcReportGenNPlus2.iteration.asString))),
+              dereferences = Seq(JsonReport.Dereference(blobId.asString(), generationPlusOne, gcReportGenNPlus2.iteration.asString))),
             //second gc
             JsonReport.State(GarbageCollect,
               `reference-generations` = TreeSet(generation, generationPlusOne),
               `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration, gcReportGenNPlus3.iteration.asString),
               blobs = Nil,
               references = Nil,
-              deletions = Nil)))
+              dereferences = Nil)))
         }
 
 
-
         "json serialization" in {
           val reference = Reference(externalId, blobId, generation)
           val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map.empty),
             lastIteration = Iteration.initial,
             targetedGeneration = generation.next(2))
 
-          val generationPlusOne= generation.next
+          val generationPlusOne = generation.next
           val dereference = Dereference(generation.next, reference)
           val gcReportGenNPlus3 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map(generationPlusOne -> List(dereference))),
             lastIteration = gcReportGenNPlus2.iteration,


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 13/15: JAMES-3184 Add serialized info for reindexing tasks RunningOptions

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ce9d7ba6ab5b3af21205ddc20d84013c7c76adb9
Author: LanKhuat <kh...@gmail.com>
AuthorDate: Wed Jul 1 16:09:23 2020 +0700

    JAMES-3184 Add serialized info for reindexing tasks RunningOptions
---
 .../apache/james/mailbox/indexer/ReIndexer.java    |   4 -
 .../WebAdminReprocessingContextInformationDTO.java |  21 ++--
 ...lboxReindexingTaskAdditionalInformationDTO.java |   9 +-
 ...UserReindexingTaskAdditionalInformationDTO.java |   8 ++
 ...AdminReprocessingContextInformationDTOTest.java |   2 +
 .../WebAdminSingleMailboxReprocessingDTOTest.java  |   1 +
 .../dto/WebAdminUserReprocessingDTOTest.java       |   1 +
 .../james/webadmin/routes/MailboxesRoutesTest.java | 136 +++++++++++++++++++++
 .../webadmin/routes/UserMailboxesRoutesTest.java   |  43 +++++++
 9 files changed, 213 insertions(+), 12 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
index 60c08a4..e953059 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/indexer/ReIndexer.java
@@ -46,10 +46,6 @@ public interface ReIndexer {
                 return this;
             }
 
-            public Builder messagesPerSeconds(int messagesPerSecond) {
-                return messagesPerSeconds(Optional.of(messagesPerSecond));
-            }
-
             public Builder mode(Optional<Mode> mode) {
                 this.mode = mode;
                 return this;
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTO.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTO.java
index 124fec3..89efaeb 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTO.java
@@ -30,6 +30,7 @@ import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
 import org.apache.mailbox.tools.indexer.ErrorRecoveryIndexationTask;
 import org.apache.mailbox.tools.indexer.FullReindexingTask;
 import org.apache.mailbox.tools.indexer.ReprocessingContextInformationDTO;
+import org.apache.mailbox.tools.indexer.RunningOptionsDTO;
 
 import com.github.steveash.guavate.Guavate;
 
@@ -43,6 +44,7 @@ public class WebAdminReprocessingContextInformationDTO implements AdditionalInfo
                 })
                 .toDTOConverter((details, type) -> new WebAdminErrorRecoveryIndexationDTO(
                     type,
+                    RunningOptionsDTO.toDTO(details.getRunningOptions()),
                     details.getSuccessfullyReprocessedMailCount(),
                     details.getFailedReprocessedMailCount(),
                     details.failures(),
@@ -51,9 +53,9 @@ public class WebAdminReprocessingContextInformationDTO implements AdditionalInfo
                 .withFactory(AdditionalInformationDTOModule::new);
         }
 
-        WebAdminErrorRecoveryIndexationDTO(String type, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
+        WebAdminErrorRecoveryIndexationDTO(String type, RunningOptionsDTO runningOptionsDTO, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
                                            ReIndexingExecutionFailures failures, Instant timestamp) {
-            super(type, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
+            super(type, runningOptionsDTO, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
         }
     }
 
@@ -66,6 +68,7 @@ public class WebAdminReprocessingContextInformationDTO implements AdditionalInfo
                 })
                 .toDTOConverter((details, type) -> new WebAdminFullIndexationDTO(
                     type,
+                    RunningOptionsDTO.toDTO(details.getRunningOptions()),
                     details.getSuccessfullyReprocessedMailCount(),
                     details.getFailedReprocessedMailCount(),
                     details.failures(),
@@ -74,25 +77,25 @@ public class WebAdminReprocessingContextInformationDTO implements AdditionalInfo
                 .withFactory(AdditionalInformationDTOModule::new);
         }
 
-        WebAdminFullIndexationDTO(String type, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
+        WebAdminFullIndexationDTO(String type, RunningOptionsDTO runningOptions, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
                                   ReIndexingExecutionFailures failures, Instant timestamp) {
-            super(type, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
+            super(type, runningOptions, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
         }
-
     }
 
     protected final String type;
+    protected final RunningOptionsDTO runningOptions;
     protected final int successfullyReprocessedMailCount;
     protected final int failedReprocessedMailCount;
     protected final SerializableReIndexingExecutionFailures messageFailures;
     private final List<String> mailboxFailures;
     protected final Instant timestamp;
 
-
-    WebAdminReprocessingContextInformationDTO(String type, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
+    WebAdminReprocessingContextInformationDTO(String type, RunningOptionsDTO runningOptions, int successfullyReprocessedMailCount, int failedReprocessedMailCount,
                                               ReIndexingExecutionFailures failures,
                                               Instant timestamp) {
         this.type = type;
+        this.runningOptions = runningOptions;
         this.successfullyReprocessedMailCount = successfullyReprocessedMailCount;
         this.failedReprocessedMailCount = failedReprocessedMailCount;
         this.messageFailures = SerializableReIndexingExecutionFailures.from(failures);
@@ -102,6 +105,10 @@ public class WebAdminReprocessingContextInformationDTO implements AdditionalInfo
         this.timestamp = timestamp;
     }
 
+    public RunningOptionsDTO getRunningOptions() {
+        return runningOptions;
+    }
+
     public int getSuccessfullyReprocessedMailCount() {
         return successfullyReprocessedMailCount;
     }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO.java
index 19733ae..d6348e1 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO.java
@@ -27,6 +27,7 @@ import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.mailbox.tools.indexer.RunningOptionsDTO;
 import org.apache.mailbox.tools.indexer.SingleMailboxReindexingTask;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -42,6 +43,7 @@ public class WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO impleme
             .toDTOConverter((details, type) -> new WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO(
                 type,
                 details.getMailboxId(),
+                RunningOptionsDTO.toDTO(details.getRunningOptions()),
                 details.getSuccessfullyReprocessedMailCount(),
                 details.getFailedReprocessedMailCount(),
                 details.failures(),
@@ -56,13 +58,14 @@ public class WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO impleme
     @JsonCreator
     private WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO(String type,
                                                                         String mailboxId,
+                                                                        RunningOptionsDTO runningOptions,
                                                                         int successfullyReprocessedMailCount,
                                                                         int failedReprocessedMailCount,
                                                                         ReIndexingExecutionFailures failures,
                                                                         Instant timestamp) {
         this.mailboxId = mailboxId;
         this.reprocessingContextInformationDTO = new WebAdminReprocessingContextInformationDTO(
-            type, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
+            type, runningOptions, successfullyReprocessedMailCount, failedReprocessedMailCount, failures, timestamp);
     }
 
     @Override
@@ -78,6 +81,10 @@ public class WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO impleme
         return mailboxId;
     }
 
+    public RunningOptionsDTO getRunningOptions() {
+        return reprocessingContextInformationDTO.getRunningOptions();
+    }
+
     public int getSuccessfullyReprocessedMailCount() {
         return reprocessingContextInformationDTO.getSuccessfullyReprocessedMailCount();
     }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminUserReindexingTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminUserReindexingTaskAdditionalInformationDTO.java
index 2b00d4f..ff555b1 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminUserReindexingTaskAdditionalInformationDTO.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/WebAdminUserReindexingTaskAdditionalInformationDTO.java
@@ -27,6 +27,7 @@ import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.mailbox.tools.indexer.RunningOptionsDTO;
 import org.apache.mailbox.tools.indexer.UserReindexingTask;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -42,6 +43,7 @@ public class WebAdminUserReindexingTaskAdditionalInformationDTO implements Addit
             .toDTOConverter((details, type) -> new WebAdminUserReindexingTaskAdditionalInformationDTO(
                 type,
                 details.getUsername(),
+                RunningOptionsDTO.toDTO(details.getRunningOptions()),
                 details.getSuccessfullyReprocessedMailCount(),
                 details.getFailedReprocessedMailCount(),
                 details.failures(),
@@ -56,6 +58,7 @@ public class WebAdminUserReindexingTaskAdditionalInformationDTO implements Addit
     @JsonCreator
     private WebAdminUserReindexingTaskAdditionalInformationDTO(String type,
                                                                String username,
+                                                               RunningOptionsDTO runningOptions,
                                                                int successfullyReprocessedMailCount,
                                                                int failedReprocessedMailCount,
                                                                ReIndexingExecutionFailures failures,
@@ -63,6 +66,7 @@ public class WebAdminUserReindexingTaskAdditionalInformationDTO implements Addit
         this.username = username;
         this.reprocessingContextInformationDTO = new WebAdminReprocessingContextInformationDTO(
             type,
+            runningOptions,
             successfullyReprocessedMailCount,
             failedReprocessedMailCount, failures, timestamp);
     }
@@ -80,6 +84,10 @@ public class WebAdminUserReindexingTaskAdditionalInformationDTO implements Addit
         return username;
     }
 
+    public RunningOptionsDTO getRunningOptions() {
+        return reprocessingContextInformationDTO.getRunningOptions();
+    }
+
     public int getSuccessfullyReprocessedMailCount() {
         return reprocessingContextInformationDTO.getSuccessfullyReprocessedMailCount();
     }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTOTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTOTest.java
index 48ffab4..6bcfa5f 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTOTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminReprocessingContextInformationDTOTest.java
@@ -41,6 +41,7 @@ class WebAdminReprocessingContextInformationDTOTest {
 
     private final String serializedErrorRecoveryAdditionalInformation = "{" +
         "  \"type\":\"error-recovery-indexation\"," +
+        "  \"runningOptions\":{\"messagesPerSecond\":50,\"mode\":\"REBUILD_ALL\"}," +
         "  \"successfullyReprocessedMailCount\":42," +
         "  \"failedReprocessedMailCount\":2," +
         "  \"messageFailures\":{\"1\":[{\"uid\":10}],\"2\":[{\"uid\":20}]}," +
@@ -48,6 +49,7 @@ class WebAdminReprocessingContextInformationDTOTest {
         "  \"timestamp\":\"2018-11-13T12:00:55Z\"}";
     private final String serializedFullAdditionalInformation = "{" +
         "  \"type\":\"full-reindexing\"," +
+        "  \"runningOptions\":{\"messagesPerSecond\":50,\"mode\":\"REBUILD_ALL\"}," +
         "  \"successfullyReprocessedMailCount\":42," +
         "  \"failedReprocessedMailCount\":2," +
         "  \"messageFailures\":{\"1\":[{\"uid\":10}],\"2\":[{\"uid\":20}]}," +
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReprocessingDTOTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReprocessingDTOTest.java
index 6409881..c112ea5 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReprocessingDTOTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminSingleMailboxReprocessingDTOTest.java
@@ -42,6 +42,7 @@ class WebAdminSingleMailboxReprocessingDTOTest {
     private final String serializedAdditionalInformation = "{" +
         "  \"type\":\"mailbox-reindexing\"," +
         "  \"mailboxId\":\"1\"," +
+        "  \"runningOptions\":{\"messagesPerSecond\":50,\"mode\":\"REBUILD_ALL\"}," +
         "  \"successfullyReprocessedMailCount\":42," +
         "  \"failedReprocessedMailCount\":2," +
         "  \"messageFailures\":{\"1\":[{\"uid\":10}],\"2\":[{\"uid\":20}]}," +
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminUserReprocessingDTOTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminUserReprocessingDTOTest.java
index 05987d4..09f49f3 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminUserReprocessingDTOTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/WebAdminUserReprocessingDTOTest.java
@@ -42,6 +42,7 @@ class WebAdminUserReprocessingDTOTest {
 
     private final String serializedAdditionalInformation = "{" +
         "  \"type\":\"user-reindexing\",\"username\":\"bob\"," +
+        "  \"runningOptions\":{\"messagesPerSecond\":50,\"mode\":\"REBUILD_ALL\"}," +
         "  \"successfullyReprocessedMailCount\":42," +
         "  \"failedReprocessedMailCount\":2," +
         "  \"messageFailures\":{\"1\":[{\"uid\":10}],\"2\":[{\"uid\":20}]}," +
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index c537a72..b64e4ab 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -250,6 +250,8 @@ class MailboxesRoutesTest {
                     .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -279,6 +281,40 @@ class MailboxesRoutesTest {
                     .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(1))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
+                    .body("startedDate", is(notNullValue()))
+                    .body("submitDate", is(notNullValue()))
+                    .body("completedDate", is(notNullValue()));
+            }
+
+            @Test
+            void fullReprocessingWithMessagesPerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                mailboxManager.createMailbox(INBOX, systemSession).get();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                String taskId = with()
+                    .queryParam("messagesPerSecond", 1)
+                    .post("/mailboxes?task=reIndex")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", is("completed"))
+                    .body("taskId", is(notNullValue()))
+                    .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
+                    .body("additionalInformation.successfullyReprocessedMailCount", is(1))
+                    .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(1))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -313,6 +349,8 @@ class MailboxesRoutesTest {
                     .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(1))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()));
@@ -386,6 +424,8 @@ class MailboxesRoutesTest {
                     .body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(2))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("FIX_OUTDATED"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -619,6 +659,8 @@ class MailboxesRoutesTest {
                     .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -649,6 +691,41 @@ class MailboxesRoutesTest {
                     .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(1))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
+                    .body("startedDate", is(notNullValue()))
+                    .body("submitDate", is(notNullValue()));
+            }
+
+            @Test
+            void mailboxReprocessingWithMessagesPerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                String taskId = with()
+                    .queryParam("task", "reIndex")
+                    .queryParam("messagesPerSecond", 1)
+                    .post("/mailboxes/" + mailboxId.serialize())
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", is("completed"))
+                    .body("taskId", is(notNullValue()))
+                    .body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
+                    .body("additionalInformation.mailboxId", is(mailboxId.serialize()))
+                    .body("additionalInformation.successfullyReprocessedMailCount", is(1))
+                    .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(1))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()));
             }
@@ -683,6 +760,8 @@ class MailboxesRoutesTest {
                     .body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(1))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()));
@@ -760,6 +839,8 @@ class MailboxesRoutesTest {
                     .body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(2))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("FIX_OUTDATED"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -1181,6 +1262,8 @@ class MailboxesRoutesTest {
                     .body("type", is("error-recovery-indexation"))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -1225,6 +1308,55 @@ class MailboxesRoutesTest {
                     .body("type", is("error-recovery-indexation"))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(1))
                     .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
+                    .body("startedDate", is(notNullValue()))
+                    .body("submitDate", is(notNullValue()))
+                    .body("completedDate", is(notNullValue()));
+            }
+
+            @Test
+            void fixingReIndexingWithMessagePerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                mailboxManager.createMailbox(INBOX, systemSession).get();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+
+                String taskId = with()
+                    .post("/mailboxes?task=reIndex")
+                    .jsonPath()
+                    .get("taskId");
+
+                with()
+                    .basePath(TasksRoutes.BASE)
+                    .get(taskId + "/await");
+
+                doReturn(Mono.empty()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+
+                String fixingTaskId = with()
+                    .queryParam("reIndexFailedMessagesOf", taskId)
+                    .queryParam("task", "reIndex")
+                    .queryParam("messagesPerSecond", 1)
+                    .post("/mailboxes")
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(fixingTaskId + "/await")
+                .then()
+                    .body("status", is("completed"))
+                    .body("taskId", is(notNullValue()))
+                    .body("type", is("error-recovery-indexation"))
+                    .body("additionalInformation.successfullyReprocessedMailCount", is(1))
+                    .body("additionalInformation.failedReprocessedMailCount", is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(1))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()))
                     .body("completedDate", is(notNullValue()));
@@ -1270,6 +1402,8 @@ class MailboxesRoutesTest {
                     .body("type", is("error-recovery-indexation"))
                     .body("additionalInformation.successfullyReprocessedMailCount", is(0))
                     .body("additionalInformation.failedReprocessedMailCount", is(1))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
                     .body("startedDate", is(notNullValue()))
                     .body("submitDate", is(notNullValue()));
@@ -1307,6 +1441,8 @@ class MailboxesRoutesTest {
                 .then()
                     .body("status", Matchers.is("failed"))
                     .body("taskId", Matchers.is(notNullValue()))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", is(50))
+                    .body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
                     .body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
             }
         }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 768913f..f6db1b5 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -1265,6 +1265,8 @@ class UserMailboxesRoutesTest {
                     .body("additionalInformation.username", Matchers.is("username"))
                     .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(0))
                     .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", Matchers.is(50))
+                    .body("additionalInformation.runningOptions.mode", Matchers.is("REBUILD_ALL"))
                     .body("startedDate", Matchers.is(notNullValue()))
                     .body("submitDate", Matchers.is(notNullValue()))
                     .body("completedDate", Matchers.is(notNullValue()));
@@ -1297,6 +1299,43 @@ class UserMailboxesRoutesTest {
                     .body("additionalInformation.username", Matchers.is("username"))
                     .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(1))
                     .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", Matchers.is(50))
+                    .body("additionalInformation.runningOptions.mode", Matchers.is("REBUILD_ALL"))
+                    .body("startedDate", Matchers.is(notNullValue()))
+                    .body("submitDate", Matchers.is(notNullValue()))
+                    .body("completedDate", Matchers.is(notNullValue()));
+            }
+
+            @Test
+            void userReprocessingWithMessagesPerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
+                MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
+                mailboxManager.createMailbox(INBOX, systemSession).get();
+                mailboxManager.getMailbox(INBOX, systemSession)
+                    .appendMessage(
+                        MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
+                        systemSession);
+
+                String taskId = given()
+                    .queryParam("task", "reIndex")
+                    .queryParam("messagesPerSecond", 1)
+                .when()
+                    .post()
+                    .jsonPath()
+                    .get("taskId");
+
+                given()
+                    .basePath(TasksRoutes.BASE)
+                .when()
+                    .get(taskId + "/await")
+                .then()
+                    .body("status", Matchers.is("completed"))
+                    .body("taskId", Matchers.is(notNullValue()))
+                    .body("type", Matchers.is(UserReindexingTask.USER_RE_INDEXING.asString()))
+                    .body("additionalInformation.username", Matchers.is("username"))
+                    .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(1))
+                    .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", Matchers.is(1))
+                    .body("additionalInformation.runningOptions.mode", Matchers.is("REBUILD_ALL"))
                     .body("startedDate", Matchers.is(notNullValue()))
                     .body("submitDate", Matchers.is(notNullValue()))
                     .body("completedDate", Matchers.is(notNullValue()));
@@ -1332,6 +1371,8 @@ class UserMailboxesRoutesTest {
                     .body("type", Matchers.is(UserReindexingTask.USER_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(0))
                     .body("additionalInformation.failedReprocessedMailCount", Matchers.is(1))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", Matchers.is(50))
+                    .body("additionalInformation.runningOptions.mode", Matchers.is("REBUILD_ALL"))
                     .body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", Matchers.is(Long.valueOf(uidAsLong).intValue()))
                     .body("startedDate", Matchers.is(notNullValue()))
                     .body("submitDate", Matchers.is(notNullValue()));
@@ -1408,6 +1449,8 @@ class UserMailboxesRoutesTest {
                     .body("type", Matchers.is(UserReindexingTask.USER_RE_INDEXING.asString()))
                     .body("additionalInformation.successfullyReprocessedMailCount", Matchers.is(2))
                     .body("additionalInformation.failedReprocessedMailCount", Matchers.is(0))
+                    .body("additionalInformation.runningOptions.messagesPerSecond", Matchers.is(50))
+                    .body("additionalInformation.runningOptions.mode", Matchers.is("FIX_OUTDATED"))
                     .body("startedDate", Matchers.is(notNullValue()))
                     .body("submitDate", Matchers.is(notNullValue()))
                     .body("completedDate", Matchers.is(notNullValue()));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/15: JAMES-3150 unify play-json dependency version

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7ef2da79ea94953bdc2e22d67e72c68ae41398c2
Author: Rémi Kowalski <rk...@linagora.com>
AuthorDate: Fri Jun 19 10:23:30 2020 +0200

    JAMES-3150 unify play-json dependency version
---
 mailbox/event/json/pom.xml             | 1 -
 server/blob/blob-deduplicating/pom.xml | 1 -
 2 files changed, 2 deletions(-)

diff --git a/mailbox/event/json/pom.xml b/mailbox/event/json/pom.xml
index a6a98d5..161b77d 100644
--- a/mailbox/event/json/pom.xml
+++ b/mailbox/event/json/pom.xml
@@ -60,7 +60,6 @@
         <dependency>
             <groupId>com.typesafe.play</groupId>
             <artifactId>play-json_${scala.base}</artifactId>
-            <version>2.8.1</version>
         </dependency>
         <dependency>
             <groupId>net.javacrumbs.json-unit</groupId>
diff --git a/server/blob/blob-deduplicating/pom.xml b/server/blob/blob-deduplicating/pom.xml
index 0bf6147..7f531e9 100644
--- a/server/blob/blob-deduplicating/pom.xml
+++ b/server/blob/blob-deduplicating/pom.xml
@@ -69,7 +69,6 @@
         <dependency>
             <groupId>com.typesafe.play</groupId>
             <artifactId>play-json_${scala.base}</artifactId>
-            <version>2.8.1</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 15/15: JAMES-3265 Leverage default method to offer a conviant single message flag update

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ccc5e48964a057b8970563e1936380d5062834ef
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 16:05:13 2020 +0700

    JAMES-3265 Leverage default method to offer a conviant single message flag update
---
 .../james/mailbox/store/mail/MessageMapper.java    |   5 +
 .../store/mail/model/MessageMapperTest.java        | 139 ++++++++++-----------
 2 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index ece0c7d..d92be02 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -127,6 +127,11 @@ public interface MessageMapper extends Mapper {
     Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator,
             final MessageRange set) throws MailboxException;
 
+    default Optional<UpdatedFlags> updateFlags(Mailbox mailbox, MessageUid uid, FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException {
+        return Iterators.toStream(updateFlags(mailbox, flagsUpdateCalculator, MessageRange.one(uid)))
+            .findFirst();
+    }
+
     default List<UpdatedFlags> resetRecent(Mailbox mailbox) throws MailboxException {
         final List<MessageUid> members = findRecentMessageUidsInMailbox(mailbox);
         ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 0d396a2..9907c5e 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -150,7 +150,7 @@ public abstract class MessageMapperTest {
         saveMessages();
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE);
 
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4);
     }
@@ -158,8 +158,8 @@ public abstract class MessageMapperTest {
     @Test
     void mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedUnSeen() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(), FlagsUpdateMode.REPLACE));
         assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(5);
     }
 
@@ -317,9 +317,10 @@ public abstract class MessageMapperTest {
     @Test
     void findRecentUidsInMailboxShouldReturnListOfMessagesHoldingFlagsRecent() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
-        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+        FlagsUpdateCalculator setRecent = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setRecent);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setRecent);
+        messageMapper.updateFlags(benwaWorkMailbox, message6.getUid(), setRecent);
         assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox))
             .containsOnly(message2.getUid(), message4.getUid());
     }
@@ -332,9 +333,10 @@ public abstract class MessageMapperTest {
     @Test
     void resetRecentsShouldRemoveAllRecentFlags() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
-        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+        FlagsUpdateCalculator setRecent = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setRecent);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setRecent);
+        messageMapper.updateFlags(benwaWorkMailbox, message6.getUid(), setRecent);
 
         messageMapper.resetRecent(benwaInboxMailbox);
 
@@ -345,9 +347,10 @@ public abstract class MessageMapperTest {
     @Test
     void resetRecentsShouldReturnUpdatedFlags() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
-        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+        FlagsUpdateCalculator setRecent = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setRecent);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setRecent);
+        messageMapper.updateFlags(benwaWorkMailbox, message6.getUid(), setRecent);
 
         assertThat(messageMapper.resetRecent(benwaInboxMailbox))
             .extracting(UpdatedFlags::getUid)
@@ -357,9 +360,10 @@ public abstract class MessageMapperTest {
     @Test
     void deleteShouldUpdateRecentWhenNeeded() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
-        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+        FlagsUpdateCalculator setRecent = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setRecent);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setRecent);
+        messageMapper.updateFlags(benwaWorkMailbox, message6.getUid(), setRecent);
 
         messageMapper.delete(benwaInboxMailbox, message2);
 
@@ -369,9 +373,10 @@ public abstract class MessageMapperTest {
     @Test
     void deleteShouldNotUpdateRecentWhenNotNeeded() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
-        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+        FlagsUpdateCalculator setRecent = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setRecent);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setRecent);
+        messageMapper.updateFlags(benwaWorkMailbox, message6.getUid(), setRecent);
 
         messageMapper.delete(benwaInboxMailbox, message1);
 
@@ -381,9 +386,10 @@ public abstract class MessageMapperTest {
     @Test
     void deleteMessagesShouldDecrementUnseenToOneWhenDeletingTwoUnseenMessagesOutOfThree() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message3.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+        FlagsUpdateCalculator setSeen = new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message2.getUid(), setSeen);
+        messageMapper.updateFlags(benwaInboxMailbox, message3.getUid(), setSeen);
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), setSeen);
 
         messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
 
@@ -426,9 +432,10 @@ public abstract class MessageMapperTest {
     @Test
     void findFirstUnseenMessageUidShouldReturnUid2WhenUid2isSeen() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message3.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE), MessageRange.one(message5.getUid()));
+        FlagsUpdateCalculator setSeen = new FlagsUpdateCalculator(new Flags(Flag.SEEN), FlagsUpdateMode.REPLACE);
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), setSeen);
+        messageMapper.updateFlags(benwaInboxMailbox, message3.getUid(), setSeen);
+        messageMapper.updateFlags(benwaWorkMailbox, message5.getUid(), setSeen);
         assertThat(messageMapper.findFirstUnseenMessageUid(benwaInboxMailbox)).isEqualTo(message2.getUid());
     }
 
@@ -689,7 +696,7 @@ public abstract class MessageMapperTest {
     @Test
     void flagsReplacementShouldReplaceStoredMessageFlags() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE));
         assertThat(retrieveMessageFromStorage(message1)).hasFlags(new Flags(Flags.Flag.FLAGGED));
     }
 
@@ -697,10 +704,10 @@ public abstract class MessageMapperTest {
     void flagsReplacementShouldReturnAnUpdatedFlagHighlightingTheReplacement() throws MailboxException {
         saveMessages();
         ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-        Iterator<UpdatedFlags> updatedFlags = messageMapper.updateFlags(benwaInboxMailbox,
-                new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        assertThat(Lists.newArrayList(updatedFlags))
-            .containsOnly(UpdatedFlags.builder()
+        Optional<UpdatedFlags> updatedFlags = messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(),
+                new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE));
+        assertThat(updatedFlags)
+            .contains(UpdatedFlags.builder()
                 .uid(message1.getUid())
                 .modSeq(modSeq.next())
                 .oldFlags(new Flags())
@@ -711,11 +718,10 @@ public abstract class MessageMapperTest {
     @Test
     void flagsAdditionShouldReturnAnUpdatedFlagHighlightingTheAddition() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE));
         ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.ADD), MessageRange.one(message1.getUid())))
-            .toIterable()
-            .containsOnly(UpdatedFlags.builder()
+        assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.ADD)))
+            .contains(UpdatedFlags.builder()
                     .uid(message1.getUid())
                     .modSeq(modSeq.next())
                     .oldFlags(new Flags(Flags.Flag.FLAGGED))
@@ -726,17 +732,17 @@ public abstract class MessageMapperTest {
     @Test
     void flagsAdditionShouldUpdateStoredMessageFlags() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.ADD), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.ADD));
         assertThat(retrieveMessageFromStorage(message1)).hasFlags(new FlagsBuilder().add(Flags.Flag.SEEN, Flags.Flag.FLAGGED).build());
     }
 
     @Test
     void flagsAdditionShouldHaveNoEffectOnStoredFlagsWhenNoop() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), FlagsUpdateMode.REPLACE));
 
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.FLAGGED), FlagsUpdateMode.ADD), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flag.FLAGGED), FlagsUpdateMode.ADD));
         assertThat(retrieveMessageFromStorage(message1))
             .hasFlags(new FlagsBuilder().add(Flags.Flag.FLAGGED).build());
     }
@@ -744,11 +750,10 @@ public abstract class MessageMapperTest {
     @Test
     void flagsRemovalShouldReturnAnUpdatedFlagHighlightingTheRemoval() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, Flags.Flag.SEEN).build(), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, Flags.Flag.SEEN).build(), FlagsUpdateMode.REPLACE));
         ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REMOVE), MessageRange.one(message1.getUid())))
-            .toIterable()
-            .containsOnly(
+        assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REMOVE)))
+            .contains(
                 UpdatedFlags.builder()
                     .uid(message1.getUid())
                     .modSeq(modSeq.next())
@@ -760,8 +765,8 @@ public abstract class MessageMapperTest {
     @Test
     void flagsRemovalShouldUpdateStoredMessageFlags() throws MailboxException {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, Flags.Flag.SEEN).build(), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REMOVE), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, Flags.Flag.SEEN).build(), FlagsUpdateMode.REPLACE));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REMOVE));
         assertThat(retrieveMessageFromStorage(message1)).hasFlags(new Flags(Flags.Flag.FLAGGED));
     }
 
@@ -865,7 +870,7 @@ public abstract class MessageMapperTest {
     @Test
     void userFlagsShouldBeSupported() throws Exception {
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.ADD), MessageRange.one(message1.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.ADD));
         assertThat(retrieveMessageFromStorage(message1)).hasFlags(new Flags(USER_FLAG));
     }
 
@@ -873,9 +878,8 @@ public abstract class MessageMapperTest {
     void userFlagsUpdateShouldReturnCorrectUpdatedFlags() throws Exception {
         saveMessages();
         ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.ADD), MessageRange.one(message1.getUid())))
-            .toIterable()
-            .containsOnly(
+        assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.ADD)))
+            .contains(
                 UpdatedFlags.builder()
                     .uid(message1.getUid())
                     .modSeq(modSeq.next())
@@ -889,11 +893,9 @@ public abstract class MessageMapperTest {
         saveMessages();
 
         assertThat(
-            messageMapper.updateFlags(benwaInboxMailbox,
-                new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.REMOVE),
-                MessageRange.one(message1.getUid())))
-            .toIterable()
-            .containsOnly(
+            messageMapper.updateFlags(benwaInboxMailbox,message1.getUid(),
+                new FlagsUpdateCalculator(new Flags(USER_FLAG), FlagsUpdateMode.REMOVE)))
+            .contains(
                 UpdatedFlags.builder()
                     .uid(message1.getUid())
                     .modSeq(message1.getModSeq())
@@ -911,9 +913,8 @@ public abstract class MessageMapperTest {
         int threadCount = 2;
         int updateCount = 10;
         ConcurrentTestRunner.builder()
-            .operation((threadNumber, step) -> messageMapper.updateFlags(benwaInboxMailbox,
-                new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + step), FlagsUpdateMode.ADD),
-                MessageRange.one(message1.getUid())))
+            .operation((threadNumber, step) -> messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(),
+                new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + step), FlagsUpdateMode.ADD)))
             .threadCount(threadCount)
             .operationCount(updateCount)
             .runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -934,14 +935,12 @@ public abstract class MessageMapperTest {
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step  < updateCount / 2) {
-                    messageMapper.updateFlags(benwaInboxMailbox,
-                        new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + step), FlagsUpdateMode.ADD),
-                        MessageRange.one(message1.getUid()));
+                    messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(),
+                        new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + step), FlagsUpdateMode.ADD));
                 } else {
-                    messageMapper.updateFlags(benwaInboxMailbox,
+                    messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(),
                         new FlagsUpdateCalculator(new Flags("custom-" + threadNumber + "-" + (updateCount - step - 1)),
-                            FlagsUpdateMode.REMOVE),
-                        MessageRange.one(message1.getUid()));
+                            FlagsUpdateMode.REMOVE));
                 }
             })
             .threadCount(threadCount)
@@ -1014,7 +1013,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(customFlag), FlagsUpdateMode.ADD);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(FlagsBuilder.builder()
@@ -1032,7 +1031,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(new FlagsBuilder()
@@ -1050,7 +1049,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), FlagsUpdateMode.REPLACE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(FlagsBuilder.builder()
@@ -1067,7 +1066,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(customFlags), FlagsUpdateMode.REMOVE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(new FlagsBuilder()
@@ -1085,7 +1084,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(customFlags), FlagsUpdateMode.REMOVE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(FlagsBuilder.builder()
@@ -1104,7 +1103,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(), FlagsUpdateMode.REPLACE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(FlagsBuilder.builder()
@@ -1123,7 +1122,7 @@ public abstract class MessageMapperTest {
         FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(), FlagsUpdateMode.REPLACE);
 
         saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, newFlags, message1.getUid().toRange());
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
 
         assertThat(messageMapper.getApplicableFlag(benwaInboxMailbox))
             .isEqualTo(new FlagsBuilder()
@@ -1218,8 +1217,8 @@ public abstract class MessageMapperTest {
     }
 
     private List<MessageUid> markThenPerformRetrieveMessagesMarkedForDeletion(MessageRange range) throws MailboxException {
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.DELETED), FlagsUpdateMode.REPLACE), MessageRange.one(message1.getUid()));
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.DELETED), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.DELETED), FlagsUpdateMode.REPLACE));
+        messageMapper.updateFlags(benwaInboxMailbox, message4.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.DELETED), FlagsUpdateMode.REPLACE));
         return messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, range);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 11/15: JAMES-3201 Add a missing task::await to improve test stability

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8c6bf81abac08d795c40efa9eb212c7821b9b704
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 12:14:25 2020 +0700

    JAMES-3201 Add a missing task::await to improve test stability
---
 .../java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java    | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index a15fd6c..c537a72 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -1289,6 +1289,10 @@ class MailboxesRoutesTest {
                     .jsonPath()
                     .get("taskId");
 
+                with()
+                    .basePath(TasksRoutes.BASE)
+                    .get(taskId + "/await");
+
                 String fixingTaskId = with()
                     .queryParam("reIndexFailedMessagesOf", taskId)
                     .queryParam("task", "reIndex")


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/15: JAMES-3184 Throttling should survive errors

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9f514be644512549f08300f344769499bd1ede82
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 10:50:00 2020 +0700

    JAMES-3184 Throttling should survive errors
---
 .../java/org/apache/james/util/ReactorUtils.java   |  8 ++--
 .../org/apache/james/util/ReactorUtilsTest.java    | 52 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 1548d1d..38e9908 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -30,6 +30,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -41,9 +43,8 @@ import reactor.util.context.Context;
 import reactor.util.function.Tuple2;
 
 public class ReactorUtils {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorUtils.class);
     public static final String MDC_KEY_PREFIX = "MDC-";
-
     private static final Duration DELAY = Duration.ZERO;
 
     public static <T, U> RequiresQuantity<T, U> throttle() {
@@ -56,7 +57,8 @@ public class ReactorUtils {
                 .windowTimeout(elements, duration)
                 .zipWith(Flux.interval(DELAY, duration))
                 .flatMap(Tuple2::getT1, elements, elements)
-                .flatMap(operation, elements);
+                .flatMap(operation, elements)
+                .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
         };
     }
 
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index dc286ad..58960c3 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -151,6 +152,57 @@ class ReactorUtilsTest {
             assertThat(ongoingProcessingUponComputationStart)
                 .allSatisfy(processingCount -> assertThat(processingCount).isLessThanOrEqualTo(windowMaxSize));
         }
+
+        @Test
+        void throttleShouldNotAbortProcessingUponError() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10);
+            Function<Integer, Publisher<Integer>> operation =
+                i -> {
+                    if (i == 5) {
+                        return Mono.error(new RuntimeException());
+                    }
+                    return Mono.just(i);
+                };
+
+            List<Integer> results = originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(operation))
+                .collectList()
+                .block();
+
+            assertThat(results)
+                .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
+        }
+
+        @Test
+        void throttleShouldNotAbortProcessingUponUpstreamError() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10)
+                .flatMap(i -> {
+                    if (i == 5) {
+                        return Mono.error(new RuntimeException());
+                    }
+                    return Mono.just(i);
+                });
+
+            List<Integer> results = originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            assertThat(results)
+                .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/15: JAMES-3184 Throttling should work for long running streams

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bddf322649f609ec7063121517d93eb684a8492a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 11:00:17 2020 +0700

    JAMES-3184 Throttling should work for long running streams
    
    As described in https://github.com/reactor/reactor-core/issues/1099
    windowTimeout does not play well with backpressure. When the number
    of window exceeds internal buffer size (32) the throttling crashed.
    
    We replaced "windowTimeout" with "window" and provides aditional
    tests covering when the upstream do not succeed to keep up with
    the throttling (try to cover regressions)
---
 server/container/util/pom.xml                      |   5 +
 .../java/org/apache/james/util/ReactorUtils.java   |   2 +-
 .../org/apache/james/util/ReactorUtilsTest.java    | 176 +++++++++++++++++++++
 3 files changed, 182 insertions(+), 1 deletion(-)

diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index 4b650c6..ab545bb 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -80,6 +80,11 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 38e9908..92877ff 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -54,7 +54,7 @@ public class ReactorUtils {
             Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive");
 
             return flux -> flux
-                .windowTimeout(elements, duration)
+                .window(elements)
                 .zipWith(Flux.interval(DELAY, duration))
                 .flatMap(Tuple2::getT1, elements, elements)
                 .flatMap(operation, elements)
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 58960c3..20f33c5 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.james.util;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Duration.ONE_SECOND;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -29,12 +30,15 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.reactivestreams.Publisher;
@@ -203,6 +207,178 @@ class ReactorUtilsTest {
             assertThat(results)
                 .containsExactly(0, 1, 2, 3, 4, 6, 7, 8, 9);
         }
+
+        @Test
+        void throttleShouldHandleLargeFluxes() {
+            int windowMaxSize = 2;
+            Duration windowDuration = Duration.ofMillis(1);
+
+            Flux<Integer> originalFlux = Flux.range(0, 10000);
+
+            assertThatCode(() -> originalFlux
+                .transform(ReactorUtils.<Integer, Integer>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .blockLast()).doesNotThrowAnyException();
+        }
+
+        @Test
+        void throttleShouldGenerateSmallerWindowsWhenUpstreamIsSlow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+            Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+            Flux<Long> originalFlux = Flux.interval(Duration.ofMillis(10));
+
+            ImmutableList<Long> perWindowCount = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))))
+                .map(i -> i / 20)
+                .doOnSubscribe(signal -> stopwatch.start())
+                .take(10)
+                .groupBy(Function.identity())
+                .flatMap(Flux::count)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            // We verify that we generate 2 elements by slice and not 3
+            // (as the upstream cannot generate more than 2 element per window)
+            assertThat(perWindowCount)
+                .allSatisfy(count -> assertThat(count).isLessThanOrEqualTo(2));
+        }
+
+        @Test
+        void throttleShouldNotDropEntriesWhenUpstreamIsSlow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.interval(Duration.ofMillis(10));
+
+            ImmutableList<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .take(10)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(results).containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
+        }
+
+        @Test
+        void throttleShouldCompleteWhenOriginalFluxDoNotFillAWindow() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            Flux<Long> originalFlux = Flux.just(0L, 1L);
+
+            ImmutableList<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .take(10)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(results).containsExactly(0L, 1L);
+        }
+
+        @Test
+        void throttleShouldSupportEmittingPartiallyCompleteWindowImmediately() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(20);
+
+            ConcurrentLinkedDeque<Long> results = new ConcurrentLinkedDeque<>();
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Flux.never());
+
+            originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(i -> {
+                        results.add(i);
+                        return Mono.just(i);
+                    }))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            Awaitility.await().atMost(ONE_SECOND)
+                .untilAsserted(() -> assertThat(results).containsExactly(0L, 1L));
+        }
+
+        @Test
+        void throttleShouldTolerateSeveralEmptySlices() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 30 * window duration (which is smaller than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(2L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L);
+        }
+
+        @Test
+        void throttleShouldTolerateManyEmptySuccessiveWindows() {
+            Hooks.onOperatorDebug();
+
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 33 * window duration (which is greater than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(165)).thenReturn(2L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L);
+        }
+
+        @Disabled("reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval " +
+            "doesn't support small downstream requests that replenish slower than the ticks)")
+        @Test
+        void throttleShouldTolerateManyEmptyWindows() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(5);
+
+            // 150 ms = 30 * window duration (which is smaller than reactor small buffers)
+            Flux<Long> originalFlux = Flux.concat(Flux.just(0L, 1L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(2L),
+                Mono.delay(Duration.ofMillis(150)).thenReturn(3L));
+
+            List<Long> results = originalFlux
+                .transform(ReactorUtils.<Long, Long>throttle()
+                    .elements(windowMaxSize)
+                    .per(windowDuration)
+                    .forOperation(Mono::just))
+                .collectList()
+                .block();
+
+            System.out.println(results);
+            assertThat(results).containsExactly(0L, 1L, 2L, 3L);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 12/15: JAMES-3290 Reactify MailLoader to handle error propagation

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c6a3ecbcaf0fe115dddd94b35bd10634cf212e1f
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Jun 30 17:44:08 2020 +0200

    JAMES-3290 Reactify MailLoader to handle error propagation
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 31 +++++--------
 .../apache/james/queue/rabbitmq/MailLoader.java    | 39 +++++++++++------
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |  6 +--
 .../james/queue/rabbitmq/MailLoaderTest.java       | 51 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 36 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b22d850..b13b613 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -24,7 +24,6 @@ import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.Metric;
@@ -35,6 +34,7 @@ import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.rabbitmq.client.Delivery;
 
@@ -76,14 +76,14 @@ class Dequeuer implements Closeable {
 
     }
 
-    private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+    private final MailLoader mailLoader;
     private final Metric dequeueMetric;
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
     private final Receiver receiver;
     private final Flux<AcknowledgableDelivery> flux;
 
-    Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+    Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
         this.mailLoader = mailLoader;
@@ -120,13 +120,8 @@ class Dequeuer implements Closeable {
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
-        try {
-            MailWithEnqueueId mailWithEnqueueId = loadMail(response);
-            ThrowingConsumer<Boolean> ack = ack(response, mailWithEnqueueId);
-            return Mono.just(new RabbitMQMailQueueItem(ack, mailWithEnqueueId));
-        } catch (MailQueue.MailQueueException e) {
-            return Mono.error(e);
-        }
+        return loadMail(response)
+            .map(mailWithEnqueueId -> new RabbitMQMailQueueItem(ack(response, mailWithEnqueueId), mailWithEnqueueId));
     }
 
     private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) {
@@ -141,17 +136,15 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private MailWithEnqueueId loadMail(Delivery response) throws MailQueue.MailQueueException {
-        MailReferenceDTO mailDTO = toMailReference(response);
-        return mailLoader.apply(mailDTO);
+    private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+        return toMailReference(response)
+            .flatMap(mailLoader::load);
     }
 
-    private MailReferenceDTO toMailReference(Delivery getResponse) throws MailQueue.MailQueueException {
-        try {
-            return mailReferenceSerializer.read(getResponse.getBody());
-        } catch (IOException e) {
-            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
-        }
+    private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
+        return Mono.fromCallable(getResponse::getBody)
+            .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
+            .onErrorResume(IOException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse DTO", e)));
     }
 
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index a616909..b724e60 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import java.util.function.Function;
+
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.MimeMessage;
@@ -29,6 +31,10 @@ import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Mono;
+
 class MailLoader {
     private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final BlobId.Factory blobIdFactory;
@@ -38,18 +44,25 @@ class MailLoader {
         this.blobIdFactory = blobIdFactory;
     }
 
-    MailWithEnqueueId load(MailReferenceDTO dto) throws MailQueue.MailQueueException {
-        try {
-            MailReference mailReference = dto.toMailReference(blobIdFactory);
-
-            Mail mail = mailReference.getMail();
-            MimeMessage mimeMessage = mimeMessageStore.read(mailReference.getPartsId()).block();
-            mail.setMessage(mimeMessage);
-            return new MailWithEnqueueId(mailReference.getEnqueueId(), mail);
-        } catch (AddressException e) {
-            throw new MailQueue.MailQueueException("Failed to parse mail address", e);
-        } catch (MessagingException e) {
-            throw new MailQueue.MailQueueException("Failed to generate mime message", e);
-        }
+    Mono<MailWithEnqueueId> load(MailReferenceDTO dto) {
+        return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory))
+            .flatMap(mailReference -> buildMail(mailReference)
+                .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail)));
+    }
+
+    private Mono<Mail> buildMail(MailReference mailReference) {
+        return mimeMessageStore.read(mailReference.getPartsId())
+            .flatMap(mimeMessage -> buildMailWithMessageReference(mailReference, mimeMessage));
+    }
+
+    private Mono<Mail> buildMailWithMessageReference(MailReference mailReference, MimeMessage mimeMessage) {
+        Function<Mail, Mono<Object>> setMessage = mail ->
+            Mono.fromRunnable(Throwing.runnable(() -> mail.setMessage(mimeMessage)).sneakyThrow())
+                .onErrorResume(AddressException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse mail address", e)))
+                .onErrorResume(MessagingException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to generate mime message", e)));
+
+        return Mono.just(mailReference.getMail())
+            .flatMap(mail -> setMessage.apply(mail)
+                .thenReturn(mail));
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index fc2f784..e9b3c32 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX
 import java.time.Clock;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
@@ -46,7 +45,6 @@ import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
@@ -65,7 +63,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         private final Sender sender;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
-        private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+        private final MailLoader mailLoader;
         private final MailQueueView.Factory mailQueueViewFactory;
         private final Clock clock;
         private final MailQueueItemDecoratorFactory decoratorFactory;
@@ -89,7 +87,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             this.clock = clock;
             this.decoratorFactory = decoratorFactory;
             this.mailReferenceSerializer = new MailReferenceSerializer();
-            this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow();
+            this.mailLoader = new MailLoader(mimeMessageStore, blobIdFactory);
             this.configuration = configuration;
         }
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
new file mode 100644
index 0000000..4b406fb
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+class MailLoaderTest {
+    @Test
+    void storeExceptionShouldBePropagated() {
+        Store<MimeMessage, MimeMessagePartsId> store = mock(Store.class);
+        when(store.read(any())).thenReturn(Mono.error(new RuntimeException("Cassandra problem")));
+        MailReferenceDTO dto = mock(MailReferenceDTO.class);
+        when(dto.toMailReference(any())).thenReturn(mock(MailReference.class));
+        MailLoader loader = new MailLoader(store, new HashBlobId.Factory());
+
+        String result = loader.load(dto)
+            .thenReturn("continued")
+            .onErrorResume(RuntimeException.class, e -> Mono.just("caught"))
+            .block();
+        assertThat(result).isEqualTo("caught");
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/15: JAMES-3150 Add debugging support for the garbage collection

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3728fae8a8a978959a849257c2a5fce99e7defdd
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Apr 15 15:56:47 2020 +0200

    JAMES-3150 Add debugging support for the garbage collection
---
 server/blob/blob-deduplicating/pom.xml             |   5 +
 .../james/server/blob/deduplication/GC.scala       |  12 +-
 .../server/blob/deduplication/GCJsonReporter.scala | 170 ++++++++++++++++++
 .../src/test/resources/gcReport.json               |  63 +++++++
 .../blob/deduplication/GCJsonReporterTest.scala    | 190 +++++++++++++++++++++
 .../blob/deduplication/GCPropertiesTest.scala      |  38 +----
 .../james/server/blob/deduplication/State.scala    |  37 ++++
 7 files changed, 476 insertions(+), 39 deletions(-)

diff --git a/server/blob/blob-deduplicating/pom.xml b/server/blob/blob-deduplicating/pom.xml
index ada4371..0bf6147 100644
--- a/server/blob/blob-deduplicating/pom.xml
+++ b/server/blob/blob-deduplicating/pom.xml
@@ -67,6 +67,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.typesafe.play</groupId>
+            <artifactId>play-json_${scala.base}</artifactId>
+            <version>2.8.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
         </dependency>
diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
index 0fa4ea8..1450e5f 100644
--- a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
+++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
@@ -42,6 +42,8 @@ sealed abstract class Generation extends Comparable[Generation] {
   def <=(that: Generation): Boolean = compareTo(that) <= 0
   def >(that: Generation): Boolean = compareTo(that) > 0
   def >=(that: Generation): Boolean = compareTo(that) >= 0
+
+  def asString: String
 }
 
 object Generation {
@@ -76,6 +78,7 @@ case class ValidGeneration(id: Long) extends Generation {
     case that: ValidGeneration => id.compareTo(that.id)
   }
 
+  override def asString: String = id.toString
 }
 
 /**
@@ -90,6 +93,8 @@ case object NonExistingGeneration extends Generation {
     case NonExistingGeneration => 0
     case _: ValidGeneration => -1
   }
+
+  override def asString: String = "non_existing"
 }
 
 /**
@@ -97,6 +102,7 @@ case object NonExistingGeneration extends Generation {
  */
 case class Iteration(id: Long, processedGenerations: Set[Generation], lastGeneration: Generation) {
   def next(generations: Set[Generation], lastGeneration: Generation): Iteration = Iteration(id + 1, generations, lastGeneration)
+  def asString = id.toString
 }
 
 object Iteration {
@@ -129,7 +135,7 @@ object Events {
 
 }
 
-case class Report(iteration: Iteration, blobsToDelete: Set[(Generation, BlobId)])
+case class GCIterationReport(iteration: Iteration, blobsToDelete: Set[(Generation, BlobId)])
 
 /**
  * Accessors to the References/Dereferences made by generations
@@ -173,7 +179,7 @@ case class StabilizedState(references: Map[Generation, Seq[Reference]], derefere
 
 object GC {
   val temporization: Long = 2
-  def plan(state: StabilizedState, lastIteration: Iteration, targetedGeneration: Generation): Report = {
+  def plan(state: StabilizedState, lastIteration: Iteration, targetedGeneration: Generation): GCIterationReport = {
     val processedGenerations = lastIteration.lastGeneration.collectibles(targetedGeneration)
     val blobsToDelete = state.dereferences
       .filter { case (generation, _) => processedGenerations.contains(generation) }
@@ -182,6 +188,6 @@ object GC {
       .filter(dereference => state.referencesAt(processedGenerations.max).isNotReferenced(dereference.reference.blobId))
       .map(dereference => (dereference.reference.generation, dereference.reference.blobId))
 
-    Report(lastIteration.next(processedGenerations, targetedGeneration.previous(temporization)), blobsToDelete)
+    GCIterationReport(lastIteration.next(processedGenerations, targetedGeneration.previous(temporization)), blobsToDelete)
   }
 }
diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
new file mode 100644
index 0000000..a37ec36
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GCJsonReporter.scala
@@ -0,0 +1,170 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.server.blob.deduplication
+
+import org.apache.james.blob.api.BlobId
+import org.apache.james.server.blob.deduplication.RelatedAction.{Delete, GarbageCollect, Init, Save}
+import play.api.libs.json.{JsString, Json, Writes}
+
+import scala.collection.immutable.TreeSet
+
+
+sealed trait RelatedAction
+object RelatedAction {
+  case object Init extends RelatedAction
+  case class Save(blobId: BlobId, reference: ExternalID) extends RelatedAction
+  case class Delete(reference: ExternalID) extends RelatedAction
+  case object GarbageCollect extends RelatedAction
+}
+
+object JsonReport {
+  case class BlobId(id : String, `reference-generation`: Generation)
+
+  case class Reference(id : String, blob: String, `reference-generation`: Generation)
+  case class Dereference(blob: String, `reference-generation`: Generation, `garbage-collection-iterations`: String)
+
+  case class State(`related-action` : RelatedAction,
+                   `reference-generations`: TreeSet[Generation],
+                   `garbage-collection-iterations`: TreeSet[String],
+                   blobs: Seq[BlobId],
+                   references: Seq[Reference],
+                   deletions: Seq[Dereference])
+
+
+  //action
+  implicit val relatedActionWrites: Writes[RelatedAction] = {
+    case Init => JsString("init")
+    case Save(blobId, reference) => JsString(s"save(blob = ${blobId.asString()}, reference = ${reference.id})")
+    case Delete(reference) => JsString(s"delete(reference = ${reference.id})")
+    case GarbageCollect => JsString(s"garbageCollect")
+  }
+  //generation
+  implicit val generationWrites: Writes[Generation] = {
+    case ValidGeneration(id) => JsString(s"$id")
+    case NonExistingGeneration => JsString(s"nonExistingGen")
+  }
+  //blobid
+  implicit val blobIdWrites: Writes[BlobId] = Json.writes[BlobId]
+  //reference
+  implicit val referenceWrites: Writes[Reference] = Json.writes[Reference]
+  //dereference
+  implicit val dereferenceWrites: Writes[Dereference] = Json.writes[Dereference]
+  //JsonReport.State
+  implicit val stateWrites: Writes[State] = Json.writes[State]
+  //JsonReport
+  implicit val reportWrites: Writes[JsonReport] = Json.writes[JsonReport]
+
+}
+
+case class JsonReport(states: Seq[JsonReport.State])
+
+
+sealed trait EventToReport
+
+case class ReferenceEvent(event : Reference) extends EventToReport
+case class DereferenceEvent(event : Dereference) extends EventToReport
+case class GCIterationEvent(event : GCIterationReport) extends EventToReport
+
+object EventToReport {
+  def extractReferencingEvents(events: Seq[EventToReport]): Seq[Event] =
+    events.flatMap {
+      case ReferenceEvent(reference) => Some(reference)
+      case DereferenceEvent(dereference) => Some(dereference)
+      case GCIterationEvent(_) => None
+    }
+  def toReportEvents(events: Seq[Event]): Seq[EventToReport] =
+    events.map {
+      case reference: Reference => ReferenceEvent(reference)
+      case dereference: Dereference => DereferenceEvent(dereference)
+    }
+}
+
+object GCJsonReporter {
+
+  def report(events: Seq[EventToReport]) : JsonReport = {
+
+   events.foldLeft((Seq[EventToReport](), JsonReport(Seq(JsonReport.State(Init,
+      TreeSet(Generation.first),
+      TreeSet(Iteration.initial.asString),
+      Seq[JsonReport.BlobId](),
+      Nil,
+      Nil)))))((acc, event) => {
+      val (events, reportStates) = acc
+      val currentEvents = events :+ event
+
+      val state : JsonReport.State = event match {
+        case ReferenceEvent(reference) =>
+          stateForReference(reportStates, reference)
+        case DereferenceEvent(dereference) =>
+          stateForDereference(reportStates, dereference)
+        case GCIterationEvent(gcReports) =>
+          val curatedAcc = (EventToReport.extractReferencingEvents(acc._1), acc._2)
+          stateForGCIteration(curatedAcc, EventToReport.extractReferencingEvents(events), gcReports)
+      }
+
+      (currentEvents, JsonReport(reportStates.states :+ state))
+    })._2
+
+
+  }
+
+  private def stateForGCIteration(acc: (Seq[Event], JsonReport), events: Seq[Event], gcReports: GCIterationReport) = {
+    val lastState = acc._2.states.last
+
+    val blobsToDeleteAsString = gcReports.blobsToDelete.map(_._2).map(_.asString())
+
+    JsonReport.State(GarbageCollect,
+      `reference-generations` = lastState.`reference-generations`,
+      `garbage-collection-iterations` = lastState.`garbage-collection-iterations` + gcReports.iteration.asString,
+      blobs = lastState.blobs.diff(gcReports.blobsToDelete.map { case (generation, blobId) => JsonReport.BlobId(blobId.asString, generation) }.toSeq),
+      references = lastState.references.filterNot(reference => blobsToDeleteAsString.contains(reference.blob)),
+      deletions = lastState.deletions.filterNot(dereference => blobsToDeleteAsString.contains(dereference.blob)))
+  }
+
+  private def stateForDereference(reportStates: JsonReport, dereference: Dereference) = {
+    val previousState = reportStates.states.last
+    val referenceGenerations = previousState.`reference-generations` + dereference.generation
+    val iterations = previousState.`garbage-collection-iterations`
+    val references = previousState.references
+    val lastIteration = previousState.`garbage-collection-iterations`.last
+    val dereferences = previousState.deletions :+ JsonReport.Dereference(dereference.blob.asString(), dereference.generation, lastIteration)
+
+    JsonReport.State(Delete(dereference.externalId),
+      `reference-generations` = referenceGenerations,
+      `garbage-collection-iterations` = iterations,
+      blobs = previousState.blobs,
+      references = references,
+      deletions = dereferences)
+  }
+
+  private def stateForReference(reportStates: JsonReport, add: Reference) = {
+    val previousState = reportStates.states.last
+    val referenceGenerations = previousState.`reference-generations` + add.generation
+    val iterations = previousState.`garbage-collection-iterations`
+    val blobId = JsonReport.BlobId(add.blobId.asString(), add.generation)
+    val blobs = if (previousState.blobs.contains(blobId))
+      previousState.blobs
+    else
+      previousState.blobs :+ JsonReport.BlobId(add.blobId.asString(), add.generation)
+    val references = previousState.references :+ JsonReport.Reference(add.externalId.id, add.blobId.asString(), add.generation)
+    val deletions = previousState.deletions
+
+    JsonReport.State(Save(add.blobId, add.externalId), referenceGenerations, iterations, blobs, references, deletions)
+  }
+}
diff --git a/server/blob/blob-deduplicating/src/test/resources/gcReport.json b/server/blob/blob-deduplicating/src/test/resources/gcReport.json
new file mode 100644
index 0000000..8c87a57
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/resources/gcReport.json
@@ -0,0 +1,63 @@
+{
+  "states" : [ {
+    "related-action" : "init",
+    "reference-generations" : [ "0" ],
+    "garbage-collection-iterations" : [ "0" ],
+    "blobs" : [ ],
+    "references" : [ ],
+    "deletions" : [ ]
+  }, {
+    "related-action" : "save(blob = 0_myHash, reference = message1)",
+    "reference-generations" : [ "0" ],
+    "garbage-collection-iterations" : [ "0" ],
+    "blobs" : [ {
+      "id" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "references" : [ {
+      "id" : "message1",
+      "blob" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "deletions" : [ ]
+  }, {
+    "related-action" : "garbageCollect",
+    "reference-generations" : [ "0" ],
+    "garbage-collection-iterations" : [ "0", "1" ],
+    "blobs" : [ {
+      "id" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "references" : [ {
+      "id" : "message1",
+      "blob" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "deletions" : [ ]
+  }, {
+    "related-action" : "delete(reference = message1)",
+    "reference-generations" : [ "0", "1" ],
+    "garbage-collection-iterations" : [ "0", "1" ],
+    "blobs" : [ {
+      "id" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "references" : [ {
+      "id" : "message1",
+      "blob" : "0_myHash",
+      "reference-generation" : "0"
+    } ],
+    "deletions" : [ {
+      "blob" : "0_myHash",
+      "reference-generation" : "1",
+      "garbage-collection-iterations" : "1"
+    } ]
+  }, {
+    "related-action" : "garbageCollect",
+    "reference-generations" : [ "0", "1" ],
+    "garbage-collection-iterations" : [ "0", "1", "2" ],
+    "blobs" : [ ],
+    "references" : [ ],
+    "deletions" : [ ]
+  } ]
+}
\ No newline at end of file
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
new file mode 100644
index 0000000..c5961c2
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCJsonReporterTest.scala
@@ -0,0 +1,190 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication
+
+import java.time.Instant
+
+import org.apache.james.server.blob.deduplication.RelatedAction.{Delete, GarbageCollect, Init, Save}
+import org.apache.james.util.ClassLoaderUtils
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+import play.api.libs.json.Json
+
+import scala.collection.immutable.TreeSet
+
+class GCJsonReporterTest extends AnyWordSpec with Matchers {
+  "Report" should {
+    val generation = Generation.first
+    val blobId = GenerationAwareBlobId(generation, "myHash")
+    val externalId = ExternalID("message1")
+
+    val initialIteration = "0"
+    val firstIteration = "1"
+    val initialReport = JsonReport.State(Init,
+      `reference-generations` = TreeSet(Generation.first),
+      `garbage-collection-iterations` = TreeSet(initialIteration),
+      blobs = Seq[JsonReport.BlobId](),
+      references = Nil,
+      deletions = Nil)
+    val firstSaveReport = JsonReport.State(Save(blobId, externalId),
+      `reference-generations` = TreeSet(generation),
+      `garbage-collection-iterations` = TreeSet(initialIteration),
+      blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+      references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+      deletions = Nil)
+    val firstDeleteReport = JsonReport.State(Delete(externalId),
+      `reference-generations` = TreeSet(generation),
+      `garbage-collection-iterations` = TreeSet(initialIteration),
+      blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+      references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+      deletions = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))
+
+    val iterationForImmediateGC = Iteration(1L, Set(), generation)
+    val gcReportImmediate = GCIterationReport(iterationForImmediateGC, Set())
+
+    "be minimal" when {
+      "on initial state" in {
+        GCJsonReporter
+          .report(GCIterationEvent(gcReportImmediate) :: Nil)
+          .states should be (Seq(initialReport,
+          JsonReport.State(GarbageCollect,
+          `reference-generations` = TreeSet(Generation.first),
+          `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+          blobs = Seq[JsonReport.BlobId](),
+          references = Nil,
+          deletions = Nil)))
+      }
+    }
+
+    "report with added references" when {
+      "one reference is added" in {
+        GCJsonReporter
+          .report(ReferenceEvent(Reference(externalId, blobId, generation)) :: GCIterationEvent(gcReportImmediate) :: Nil)
+          .states should be (Seq(
+          initialReport,
+          firstSaveReport,
+          JsonReport.State(GarbageCollect,
+          `reference-generations` = TreeSet(generation),
+          `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+          blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+          references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+          deletions = Nil )))
+      }
+
+      "one reference is added then removed" in {
+        val reference = Reference(externalId, blobId, generation)
+        GCJsonReporter.report(ReferenceEvent(reference) :: DereferenceEvent(Dereference(generation, reference)) :: GCIterationEvent(gcReportImmediate) :: Nil)
+          .states should be (Seq(
+          initialReport,
+          firstSaveReport,
+          firstDeleteReport,
+          JsonReport.State(GarbageCollect,
+            `reference-generations` = TreeSet(generation),
+            `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+            blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+            references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+            deletions = Seq(JsonReport.Dereference(blobId.asString(), generation, initialIteration)))))
+      }
+    }
+
+    "GC has been ran" when {
+      "report added and removed references" when {
+        "one reference is added then removed and the GC is ran 2 generation later" in {
+          val reference = Reference(externalId, blobId, generation)
+          val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map(generation -> List(Dereference(generation, reference)))),
+            lastIteration = Iteration.initial,
+            targetedGeneration = generation.next(2))
+
+          GCJsonReporter.report(ReferenceEvent(reference) :: DereferenceEvent(Dereference(generation, reference)) :: GCIterationEvent(gcReportGenNPlus2) :: Nil)
+            .states should be (Seq(
+            initialReport,
+            firstSaveReport,
+            firstDeleteReport,
+            JsonReport.State(GarbageCollect,
+              `reference-generations` = TreeSet(generation),
+              `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+              blobs = Nil,
+              references = Nil,
+              deletions = Nil )))
+        }
+
+        "one reference is added, a gc run two generation later, then  it is removed and the GC is ran again" in {
+          val reference = Reference(externalId, blobId, generation)
+          val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map.empty),
+            lastIteration = Iteration.initial,
+            targetedGeneration = generation.next(2))
+
+          val generationPlusOne= generation.next
+          val dereference = Dereference(generation.next, reference)
+          val gcReportGenNPlus3 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map(generationPlusOne -> List(dereference))),
+            lastIteration = gcReportGenNPlus2.iteration,
+            targetedGeneration = generationPlusOne.next(2))
+
+          GCJsonReporter.report(ReferenceEvent(reference) :: GCIterationEvent(gcReportGenNPlus2) :: DereferenceEvent(dereference) :: GCIterationEvent(gcReportGenNPlus3) :: Nil)
+            .states should be (Seq(
+            initialReport,
+            firstSaveReport,
+            //first gc
+            JsonReport.State(GarbageCollect,
+              `reference-generations` = TreeSet(generation),
+              `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+              blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+              references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+              deletions = Nil),
+            //delete
+            JsonReport.State(Delete(externalId),
+              `reference-generations` = TreeSet(generation, generationPlusOne),
+              `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration),
+              blobs = Seq[JsonReport.BlobId](JsonReport.BlobId(blobId.asString, blobId.generation)),
+              references = Seq(JsonReport.Reference(externalId.id, blobId.asString, generation)),
+              deletions = Seq(JsonReport.Dereference(blobId.asString(), generationPlusOne, gcReportGenNPlus2.iteration.asString))),
+            //second gc
+            JsonReport.State(GarbageCollect,
+              `reference-generations` = TreeSet(generation, generationPlusOne),
+              `garbage-collection-iterations` = TreeSet(initialIteration, firstIteration, gcReportGenNPlus3.iteration.asString),
+              blobs = Nil,
+              references = Nil,
+              deletions = Nil)))
+        }
+
+
+
+        "json serialization" in {
+          val reference = Reference(externalId, blobId, generation)
+          val gcReportGenNPlus2 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map.empty),
+            lastIteration = Iteration.initial,
+            targetedGeneration = generation.next(2))
+
+          val generationPlusOne= generation.next
+          val dereference = Dereference(generation.next, reference)
+          val gcReportGenNPlus3 = GC.plan(StabilizedState(Map(generation -> List(reference)), Map(generationPlusOne -> List(dereference))),
+            lastIteration = gcReportGenNPlus2.iteration,
+            targetedGeneration = generationPlusOne.next(2))
+
+          import JsonReport._
+
+          val actualJson = Json.toJson(GCJsonReporter.report(ReferenceEvent(reference) :: GCIterationEvent(gcReportGenNPlus2) :: DereferenceEvent(dereference) :: GCIterationEvent(gcReportGenNPlus3) :: Nil))
+
+          actualJson should equal(Json.parse(ClassLoaderUtils.getSystemResourceAsString("gcReport.json")))
+        }
+      }
+    }
+  }
+}
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
index ad90f1c..d2ca107 100644
--- a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
@@ -21,16 +21,11 @@ package org.apache.james.server.blob.deduplication
 import java.nio.charset.StandardCharsets
 
 import com.google.common.hash
-import org.apache.james.blob.api.BlobId
 import org.apache.james.server.blob.deduplication.Generators.{OnePassGCTestParameters, TestParameters}
 import org.scalacheck.Prop.forAll
 import org.scalacheck.Test.Parameters
 import org.scalacheck.{Arbitrary, Gen, Properties, Shrink}
 
-case class GenerationAwareBlobId(generation: Generation, hash: String) extends BlobId {
-  override def asString(): String = s"${generation}_$hash"
-}
-
 object Generators {
 
   // generate a sequence of Generations with monotonic numeric ids
@@ -171,7 +166,7 @@ object GCPropertiesTest extends Properties("GC") {
   property("2.1. GC should not delete data being referenced by a pending process or still referenced") = forAll {
     testParameters: Generators.TestParameters => {
 
-      val partitionedBlobsId = partitionBlobs(testParameters.events)
+      val partitionedBlobsId =  Oracle.partitionBlobs(testParameters.events)
       testParameters.generationsToCollect.foldLeft(true)((acc, e) => {
         val plannedDeletions = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, e).blobsToDelete.map(_._2)
         acc && partitionedBlobsId.stillReferencedBlobIds.intersect(plannedDeletions).isEmpty
@@ -189,40 +184,11 @@ object GCPropertiesTest extends Properties("GC") {
         val relevantEvents: Event => Boolean = event => event.generation <= testParameters.generationToCollect.previous(GC.temporization)
         val plannedDeletions = plan.blobsToDelete.map(_._2)
 
-        val partitionedBlobsId = partitionBlobs(testParameters.events.filter(relevantEvents))
+        val partitionedBlobsId = Oracle.partitionBlobs(testParameters.events.filter(relevantEvents))
         plannedDeletions.size >= partitionedBlobsId.notReferencedBlobIds.size * 0.9
       }
     }
   }
-
-  /*
-  Implement an oracle that implements BlobStore with a Ref Count reference tracking
-   */
-  def partitionBlobs(events: Seq[Event]): PartitionedEvents = {
-    val (referencingEvents, dereferencingEvents) = events.partition {
-      case _: Reference => true
-      case _: Dereference => false
-    }
-
-    val referencedBlobsCount = referencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
-    val dereferencedBlobsCount = dereferencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
-
-    val stillReferencedBlobIds = referencedBlobsCount.foldLeft(Set[BlobId]())((acc, kv) => {
-      val (blobId, referencesCount) = kv
-      val dereferencesCount  = dereferencedBlobsCount.getOrElse(blobId, 0)
-
-      if(referencesCount > dereferencesCount)
-        acc + blobId
-      else
-        acc
-    })
-
-    lazy val notReferencedBlobIds = dereferencedBlobsCount.keySet -- stillReferencedBlobIds
-    PartitionedEvents(stillReferencedBlobIds, notReferencedBlobIds)
-  }
-
-  case class PartitionedEvents(stillReferencedBlobIds: Set[BlobId], notReferencedBlobIds: Set[BlobId])
-
 }
 
 
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
index 119d0cf..bcd9111 100644
--- a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
@@ -19,6 +19,14 @@
 
 package org.apache.james.server.blob.deduplication
 
+import org.apache.james.blob.api.BlobId
+
+case class GenerationAwareBlobId(generation: Generation, hash: String) extends BlobId {
+  override def asString(): String = s"${generation.asString}_$hash"
+}
+
+case class PartitionedEvents(stillReferencedBlobIds: Set[BlobId], notReferencedBlobIds: Set[BlobId])
+
 /**
  * Used to iteratively build a StabilizedState
  */
@@ -44,3 +52,32 @@ object Interpreter {
   def apply(events: Seq[Event]): State =
     events.foldLeft(State.initial)((state, event) => state(event))
 }
+
+object Oracle {
+  /*
+  Implement an oracle that implements BlobStore with a Ref Count reference tracking
+   */
+  def partitionBlobs(events: Seq[Event]): PartitionedEvents = {
+    val (referencingEvents, dereferencingEvents) = events.partition {
+      case _: Reference => true
+      case _: Dereference => false
+    }
+
+    val referencedBlobsCount = referencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+    val dereferencedBlobsCount = dereferencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+
+    val stillReferencedBlobIds = referencedBlobsCount.foldLeft(Set[BlobId]())((acc, kv) => {
+      val (blobId, referencesCount) = kv
+      val dereferencesCount  = dereferencedBlobsCount.getOrElse(blobId, 0)
+
+      if(referencesCount > dereferencesCount)
+        acc + blobId
+      else
+        acc
+    })
+
+    lazy val notReferencedBlobIds = dereferencedBlobsCount.keySet -- stillReferencedBlobIds
+    PartitionedEvents(stillReferencedBlobIds, notReferencedBlobIds)
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/15: JAMES-3184 Throttling should tolerate many empty windows

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f7774766844301d89e3e31ba469c2a4af4899a8d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 30 15:29:18 2020 +0700

    JAMES-3184 Throttling should tolerate many empty windows
    
    Previous version failed in zipWith with this error:
    
    ```
    reactor.core.Exceptions$OverflowException: Could not emit tick 32 due
    to lack of requests (interval doesn't support small downstream requests
    that replenish slower than the ticks)
    ```
    
    We avoid "hard" failure, but we no longer have a timeout on the original
    timeout. Which sounds like an acceptable tradeoff.
---
 .../src/main/java/org/apache/james/util/ReactorUtils.java   |  7 ++-----
 .../test/java/org/apache/james/util/ReactorUtilsTest.java   | 13 ++++++-------
 2 files changed, 8 insertions(+), 12 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 92877ff..5ec7b9d 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -40,12 +40,10 @@ import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
 import reactor.core.publisher.SynchronousSink;
 import reactor.util.context.Context;
-import reactor.util.function.Tuple2;
 
 public class ReactorUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorUtils.class);
     public static final String MDC_KEY_PREFIX = "MDC-";
-    private static final Duration DELAY = Duration.ZERO;
 
     public static <T, U> RequiresQuantity<T, U> throttle() {
         return elements -> duration -> operation -> {
@@ -55,9 +53,8 @@ public class ReactorUtils {
 
             return flux -> flux
                 .window(elements)
-                .zipWith(Flux.interval(DELAY, duration))
-                .flatMap(Tuple2::getT1, elements, elements)
-                .flatMap(operation, elements)
+                .delayElements(duration)
+                .concatMap(window -> window.flatMap(operation))
                 .onErrorContinue((e, o) -> LOGGER.error("Error encountered while throttling for {}", o.toString(), e));
         };
     }
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 20f33c5..0ffbc1a 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -50,7 +50,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Hooks;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -112,8 +111,9 @@ class ReactorUtilsTest {
                 .collect(Guavate.toImmutableList())
                 .block();
 
+            // delayElements also delay the first element
             assertThat(windowMembership)
-                .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L);
+                .containsExactly(1L, 1L, 1L, 2L, 2L, 2L, 3L, 3L, 3L, 4L);
         }
 
         @Test
@@ -223,6 +223,9 @@ class ReactorUtilsTest {
                 .blockLast()).doesNotThrowAnyException();
         }
 
+        @Disabled("We no longer rely on 'windowTimeout', this breakage is expected." +
+            "'windowTimeout' solves this but create other, more critical issues (large flux cannot be throttled" +
+            "as described in https://github.com/reactor/reactor-core/issues/1099")
         @Test
         void throttleShouldGenerateSmallerWindowsWhenUpstreamIsSlow() {
             int windowMaxSize = 3;
@@ -270,7 +273,7 @@ class ReactorUtilsTest {
         }
 
         @Test
-        void throttleShouldCompleteWhenOriginalFluxDoNotFillAWindow() {
+        void throttleShouldCompleteWhenOriginalFluxDoesNotFillAWindow() {
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(20);
 
@@ -335,8 +338,6 @@ class ReactorUtilsTest {
 
         @Test
         void throttleShouldTolerateManyEmptySuccessiveWindows() {
-            Hooks.onOperatorDebug();
-
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(5);
 
@@ -356,8 +357,6 @@ class ReactorUtilsTest {
             assertThat(results).containsExactly(0L, 1L, 2L);
         }
 
-        @Disabled("reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval " +
-            "doesn't support small downstream requests that replenish slower than the ticks)")
         @Test
         void throttleShouldTolerateManyEmptyWindows() {
             int windowMaxSize = 3;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org