You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/10/26 05:20:16 UTC

[james-project] 03/08: JAMES-3277 Optimize range message updates for RFC-8621

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2bbde614cd5b6c8463038a492167eb18312c1967
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 22 14:34:53 2020 +0700

    JAMES-3277 Optimize range message updates for RFC-8621
---
 .../rfc8621/contract/EmailSetMethodContract.scala  | 312 +++++++++++++++++++++
 .../org/apache/james/jmap/mail/EmailSet.scala      |  19 +-
 .../apache/james/jmap/method/EmailSetMethod.scala  | 104 ++++++-
 3 files changed, 419 insertions(+), 16 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala
index 8a92b53..4029127 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala
@@ -670,6 +670,318 @@ trait EmailSetMethodContract {
   }
 
   @Test
+  def rangeFlagsAdditionShouldUpdateStoredFlags(server: GuiceJamesServer): Unit = {
+    val message: Message = Fixture.createTestMessage
+
+    val flags: Flags = FlagsBuilder.builder()
+      .add(Flags.Flag.ANSWERED)
+      .build()
+
+    val bobPath = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+    val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+
+    val request = String.format(
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [
+         |    ["Email/set", {
+         |      "accountId": "$ACCOUNT_ID",
+         |      "update": {
+         |        "${messageId1.serialize}":{
+         |          "keywords/music": true
+         |        },
+         |        "${messageId2.serialize}":{
+         |          "keywords/music": true
+         |        },
+         |        "${messageId3.serialize}":{
+         |          "keywords/music": true
+         |        },
+         |        "${messageId4.serialize}":{
+         |          "keywords/music": true
+         |        }
+         |      }
+         |    }, "c1"],
+         |    ["Email/get",
+         |     {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"],
+         |       "properties": ["keywords"]
+         |     },
+         |     "c2"]]
+         |}""".stripMargin, "$Seen")
+
+    val response = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses[0][1].updated")
+      .isEqualTo(s"""{
+          |  "${messageId1.serialize}": null,
+          |  "${messageId2.serialize}": null,
+          |  "${messageId3.serialize}": null,
+          |  "${messageId4.serialize}": null
+          |}
+      """.stripMargin)
+    assertThatJson(response)
+      .inPath("methodResponses[1][1].list")
+      .isEqualTo(String.format(
+        """[
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true,
+          |       "music": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true,
+          |       "music": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true,
+          |       "music": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true,
+          |       "music": true
+          |    }
+          |}
+          |]
+      """.stripMargin, messageId1.serialize, messageId2.serialize, messageId3.serialize, messageId4.serialize))
+  }
+
+  @Test
+  def rangeFlagsRemovalShouldUpdateStoredFlags(server: GuiceJamesServer): Unit = {
+    val message: Message = Fixture.createTestMessage
+
+    val flags: Flags = FlagsBuilder.builder()
+      .add(Flags.Flag.ANSWERED)
+      .add("music")
+      .build()
+
+    val bobPath = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+    val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+
+    val request = String.format(
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [
+         |    ["Email/set", {
+         |      "accountId": "$ACCOUNT_ID",
+         |      "update": {
+         |        "${messageId1.serialize}":{
+         |          "keywords/music": null
+         |        },
+         |        "${messageId2.serialize}":{
+         |          "keywords/music": null
+         |        },
+         |        "${messageId3.serialize}":{
+         |          "keywords/music": null
+         |        },
+         |        "${messageId4.serialize}":{
+         |          "keywords/music": null
+         |        }
+         |      }
+         |    }, "c1"],
+         |    ["Email/get",
+         |     {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"],
+         |       "properties": ["keywords"]
+         |     },
+         |     "c2"]]
+         |}""".stripMargin, "$Seen")
+
+    val response = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses[0][1].updated")
+      .isEqualTo(s"""{
+          |  "${messageId1.serialize}": null,
+          |  "${messageId2.serialize}": null,
+          |  "${messageId3.serialize}": null,
+          |  "${messageId4.serialize}": null
+          |}
+      """.stripMargin)
+    assertThatJson(response)
+      .inPath("methodResponses[1][1].list")
+      .isEqualTo(String.format(
+        """[
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true
+          |    }
+          |},
+          |{
+          |   "id":"%s",
+          |   "keywords": {
+          |       "$Answered": true
+          |    }
+          |}
+          |]
+      """.stripMargin, messageId1.serialize, messageId2.serialize, messageId3.serialize, messageId4.serialize))
+  }
+
+  @Test
+  def rangeMoveShouldUpdateMailboxId(server: GuiceJamesServer): Unit = {
+    val message: Message = Fixture.createTestMessage
+
+    val flags: Flags = FlagsBuilder.builder()
+      .add(Flags.Flag.ANSWERED)
+      .add("music")
+      .build()
+
+    val bobPath = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+    val newId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.forUser(BOB, "other"))
+    val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+    val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder()
+      .withFlags(flags).build(message)).getMessageId
+
+    val request = String.format(
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [
+         |    ["Email/set", {
+         |      "accountId": "$ACCOUNT_ID",
+         |      "update": {
+         |        "${messageId1.serialize}":{
+         |          "mailboxIds": { "${newId.serialize()}" : true}
+         |        },
+         |        "${messageId2.serialize}":{
+         |          "mailboxIds": { "${newId.serialize()}" : true}
+         |        },
+         |        "${messageId3.serialize}":{
+         |          "mailboxIds": { "${newId.serialize()}" : true}
+         |        },
+         |        "${messageId4.serialize}":{
+         |          "mailboxIds": { "${newId.serialize()}" : true}
+         |        }
+         |      }
+         |    }, "c1"],
+         |    ["Email/get",
+         |     {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"],
+         |       "properties": ["mailboxIds"]
+         |     },
+         |     "c2"]]
+         |}""".stripMargin, "$Seen")
+
+    val response = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses[0][1].updated")
+      .isEqualTo(s"""{
+          |  "${messageId1.serialize}": null,
+          |  "${messageId2.serialize}": null,
+          |  "${messageId3.serialize}": null,
+          |  "${messageId4.serialize}": null
+          |}
+      """.stripMargin)
+    assertThatJson(response)
+      .inPath("methodResponses[1][1].list")
+      .isEqualTo(s"""[
+          |{
+          |   "id":"${messageId1.serialize}",
+          |   "mailboxIds": {
+          |       "${newId.serialize}": true
+          |    }
+          |},
+          |{
+          |   "id":"${messageId2.serialize}",
+          |   "mailboxIds": {
+          |       "${newId.serialize}": true
+          |    }
+          |},
+          |{
+          |   "id":"${messageId3.serialize}",
+          |   "mailboxIds": {
+          |       "${newId.serialize}": true
+          |    }
+          |},
+          |{
+          |   "id":"${messageId4.serialize}",
+          |   "mailboxIds": {
+          |       "${newId.serialize}": true
+          |    }
+          |}
+          |]
+      """.stripMargin)
+  }
+
+  @Test
   def emailSetShouldRejectPartiallyUpdateAndResetKeywordsAtTheSameTime(server: GuiceJamesServer): Unit = {
     val message: Message = Fixture.createTestMessage
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala
index 8e36b33..ddeac39 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala
@@ -23,13 +23,12 @@ import eu.timepit.refined.api.Refined
 import eu.timepit.refined.collection.NonEmpty
 import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId
 import org.apache.james.jmap.method.WithAccountId
-import org.apache.james.jmap.model.KeywordsFactory.STRICT_KEYWORDS_FACTORY
 import org.apache.james.jmap.model.State.State
 import org.apache.james.jmap.model.{AccountId, Keywords, SetError}
 import org.apache.james.mailbox.model.MessageId
 import play.api.libs.json.JsObject
 
-import scala.util.{Failure, Right, Success, Try}
+import scala.util.{Right, Try}
 
 object EmailSet {
   type UnparsedMessageIdConstraint = NonEmpty
@@ -97,13 +96,23 @@ case class EmailSetUpdate(keywords: Option[Keywords],
         .compose(keywordsRemoval)
         .compose(keywordsReset)
 
-      Right(ValidatedEmailSetUpdate(keywordsTransformation, mailboxIdsTransformation))
+      Right(ValidatedEmailSetUpdate(keywordsTransformation, mailboxIdsTransformation, this))
     }
   }
+
+  def isOnlyMove: Boolean = mailboxIds.isDefined && mailboxIds.get.value.size == 1 &&
+    keywords.isEmpty && keywordsToAdd.isEmpty && keywordsToRemove.isEmpty
+
+  def isOnlyFlagAddition: Boolean = keywordsToAdd.isDefined && keywordsToRemove.isEmpty && mailboxIds.isEmpty &&
+    mailboxIdsToAdd.isEmpty && mailboxIdsToRemove.isEmpty
+
+  def isOnlyFlagRemoval: Boolean = keywordsToRemove.isDefined && keywordsToAdd.isEmpty && mailboxIds.isEmpty &&
+    mailboxIdsToAdd.isEmpty && mailboxIdsToRemove.isEmpty
 }
 
-case class ValidatedEmailSetUpdate private (keywords: Function[Keywords, Keywords],
-                                            mailboxIdsTransformation: Function[MailboxIds, MailboxIds])
+case class ValidatedEmailSetUpdate private (keywordsTransformation: Function[Keywords, Keywords],
+                                            mailboxIdsTransformation: Function[MailboxIds, MailboxIds],
+                                            update: EmailSetUpdate)
 
 class EmailUpdateValidationException() extends IllegalArgumentException
 case class InvalidEmailPropertyException(property: String, cause: String) extends EmailUpdateValidationException
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
index 051d435..8c83e61 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.jmap.method
 
+import java.util.function.Consumer
+
 import com.google.common.collect.ImmutableList
 import eu.timepit.refined.auto._
 import javax.inject.Inject
@@ -34,8 +36,8 @@ import org.apache.james.jmap.model.SetError.SetErrorDescription
 import org.apache.james.jmap.model.{Capabilities, Invocation, SetError, State}
 import org.apache.james.mailbox.MessageManager.FlagsUpdateMode
 import org.apache.james.mailbox.exception.MailboxNotFoundException
-import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MessageId}
-import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
+import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MailboxId, MessageId, MessageRange}
+import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager}
 import org.apache.james.metrics.api.MetricFactory
 import play.api.libs.json.{JsError, JsObject, JsSuccess}
 import reactor.core.scala.publisher.{SFlux, SMono}
@@ -47,6 +49,7 @@ case class MessageNotFoundExeception(messageId: MessageId) extends Exception
 
 class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
                                messageIdManager: MessageIdManager,
+                               mailboxManager: MailboxManager,
                                messageIdFactory: MessageId.Factory,
                                val metricFactory: MetricFactory,
                                val sessionSupplier: SessionSupplier) extends MethodRequiringAccountId[EmailSetRequest] {
@@ -137,7 +140,7 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
   override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSetRequest): SMono[InvocationWithContext] = {
     for {
       destroyResults <- destroy(request, mailboxSession)
-      updateResults <- update(request, mailboxSession)
+      updateResults <- update(request, mailboxSession).doOnError(e => e.printStackTrace())
     } yield InvocationWithContext(
       invocation = Invocation(
         methodName = invocation.invocation.methodName,
@@ -199,19 +202,98 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
       updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
         .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
         .flatMap(metaData => {
-          SFlux.fromIterable(validUpdates)
-            .concatMap[UpdateResult]({
-              case (messageId, updatePatch) =>
-                doUpdate(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
-            })
-            .collectSeq()
+          doUpdate(validUpdates, metaData, session)
         })
     } yield {
       UpdateResults(updates ++ failures)
     }
   }
 
-  private def doUpdate(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = {
+  private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+                       metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                       session: MailboxSession): SMono[Seq[UpdateResult]] = {
+    val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1
+    val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1
+
+    if (sameUpdate && singleMailbox && validUpdates.size > 3) {
+      val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get
+      val ranges: List[MessageRange] = asRanges(metaData)
+      val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get
+
+      if (update.update.isOnlyFlagAddition) {
+        updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session)
+      } else if (update.update.isOnlyFlagRemoval) {
+        updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session)
+      } else if (update.update.isOnlyMove) {
+        moveByRange(mailboxId, update, ranges, metaData, session)
+      } else {
+        updateEachMessage(validUpdates, metaData, session)
+      }
+    } else {
+      updateEachMessage(validUpdates, metaData, session)
+    }
+  }
+
+  private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) =
+    MessageRange.toRanges(metaData.values
+      .flatten.map(_.getComposedMessageId.getUid)
+      .toList.asJava)
+      .asScala.toList
+
+  private def updateFlagsByRange(mailboxId: MailboxId,
+                                 flags: Flags,
+                                 ranges: List[MessageRange],
+                                 metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                                 updateMode: FlagsUpdateMode,
+                                 session: MailboxSession): SMono[Seq[UpdateResult]] = {
+    val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session))
+
+    mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
+      range => mailbox.setFlags(flags, updateMode, range, session)))
+      .subscribeOn(Schedulers.elastic())
+  }
+
+  private def moveByRange(mailboxId: MailboxId,
+                          update: ValidatedEmailSetUpdate,
+                          ranges: List[MessageRange],
+                          metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                          session: MailboxSession): SMono[Seq[UpdateResult]] = {
+    val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
+
+    updateByRange(ranges, metaData,
+      range => mailboxManager.moveMessages(range, mailboxId, targetId, session))
+  }
+
+  private def updateByRange(ranges: List[MessageRange],
+                            metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                            operation: Consumer[MessageRange]): SMono[Seq[UpdateResult]] = {
+
+    SFlux.fromIterable(ranges)
+      .concatMap(range => {
+        val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
+          .keys
+          .toSeq
+        SMono.fromCallable[Seq[UpdateResult]](() => {
+          operation.accept(range)
+          messageIds.map(UpdateSuccess)
+        })
+          .onErrorResume(e => SMono.just(messageIds.map(id => UpdateFailure(EmailSet.asUnparsed(id), e))))
+          .subscribeOn(Schedulers.elastic())
+      })
+      .reduce(Seq(), _ ++ _)
+  }
+
+  private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+                                metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                                session: MailboxSession): SMono[Seq[UpdateResult]] =
+    SFlux.fromIterable(validUpdates)
+      .concatMap[UpdateResult]({
+        case (messageId, updatePatch) =>
+          updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
+      })
+      .collectSeq()
+
+  private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = {
     val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId))
     val originFlags: Flags = storedMetaData
       .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => {
@@ -246,7 +328,7 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
   }
 
   private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[UpdateResult] = {
-    val newFlags = update.keywords
+    val newFlags = update.keywordsTransformation
       .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get)
       .asFlagsWithRecentAndDeletedFrom(originalFlags)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org