You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/03 02:25:02 UTC

[james-project] 06/11: [REFACTORING] Split EmailSetMethod and extract create/update/destroy

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 88f6d210a6f766c6bc0f3c3634372803fbbcc685
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 2 10:35:04 2020 +0700

    [REFACTORING] Split EmailSetMethod and extract create/update/destroy
---
 .../jmap/method/EmailSetCreatePerformer.scala      | 102 ++++++
 .../jmap/method/EmailSetDeletePerformer.scala      |  99 ++++++
 .../apache/james/jmap/method/EmailSetMethod.scala  | 361 +--------------------
 .../jmap/method/EmailSetUpdatePerformer.scala      | 249 ++++++++++++++
 4 files changed, 463 insertions(+), 348 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala
new file mode 100644
index 0000000..b7abebe
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import java.time.ZonedDateTime
+import java.util.Date
+
+import javax.inject.Inject
+import javax.mail.Flags
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.core.{SetError, UTCDate}
+import org.apache.james.jmap.json.EmailSetSerializer
+import org.apache.james.jmap.mail.EmailSet.EmailCreationId
+import org.apache.james.jmap.mail.{EmailCreationRequest, EmailCreationResponse, EmailSetRequest}
+import org.apache.james.jmap.method.EmailSetCreatePerformer.{CreationFailure, CreationResult, CreationResults, CreationSuccess}
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.exception.MailboxNotFoundException
+import org.apache.james.mailbox.model.MailboxId
+import org.apache.james.mailbox.{MailboxManager, MailboxSession}
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+
+object EmailSetCreatePerformer {
+  case class CreationResults(results: Seq[CreationResult]) {
+    def created: Option[Map[EmailCreationId, EmailCreationResponse]] =
+      Option(results.flatMap{
+        case result: CreationSuccess => Some((result.clientId, result.response))
+        case _ => None
+      }.toMap)
+        .filter(_.nonEmpty)
+
+    def notCreated: Option[Map[EmailCreationId, SetError]] = {
+      Option(results.flatMap{
+        case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError))
+        case _ => None
+      }
+        .toMap)
+        .filter(_.nonEmpty)
+    }
+  }
+  trait CreationResult
+  case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult
+  case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult {
+    def asMessageSetError: SetError = e match {
+      case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage))
+      case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage))
+      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+    }
+  }
+}
+
+class EmailSetCreatePerformer @Inject()(serializer: EmailSetSerializer,
+                                        mailboxManager: MailboxManager) {
+
+  def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] =
+    SFlux.fromIterable(request.create.getOrElse(Map()))
+      .concatMap {
+        case (clientId, json) => serializer.deserializeCreationRequest(json)
+          .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))),
+            creationRequest => create(clientId, creationRequest, mailboxSession))
+      }.collectSeq()
+      .map(CreationResults)
+
+  private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = {
+    val mailboxIds: List[MailboxId] = request.mailboxIds.value
+    if (mailboxIds.size != 1) {
+      SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1")))
+    } else {
+      request.toMime4JMessage
+        .fold(e => SMono.just(CreationFailure(clientId, e)),
+          message => SMono.fromCallable[CreationResult](() => {
+            val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession)
+              .appendMessage(AppendCommand.builder()
+                .recent()
+                .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags()))
+                .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant))
+                .build(message),
+                mailboxSession)
+            CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId))
+          })
+            .subscribeOn(Schedulers.elastic())
+            .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e))))
+    }
+  }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
new file mode 100644
index 0000000..da56ab5
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
@@ -0,0 +1,99 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import javax.inject.Inject
+import org.apache.james.jmap.core.SetError
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId
+import org.apache.james.jmap.mail.{DestroyIds, EmailSet, EmailSetRequest}
+import org.apache.james.jmap.method.EmailSetDeletePerformer.{DestroyFailure, DestroyResult, DestroyResults}
+import org.apache.james.mailbox.model.{DeleteResult, MessageId}
+import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+
+import scala.jdk.CollectionConverters._
+
+object EmailSetDeletePerformer {
+  case class DestroyResults(results: Seq[DestroyResult]) {
+    def destroyed: Option[DestroyIds] =
+      Option(results.flatMap{
+        case result: DestroySuccess => Some(result.messageId)
+        case _ => None
+      }.map(EmailSet.asUnparsed))
+        .filter(_.nonEmpty)
+        .map(DestroyIds)
+
+    def notDestroyed: Option[Map[UnparsedMessageId, SetError]] =
+      Option(results.flatMap{
+        case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
+        case _ => None
+      }.toMap)
+        .filter(_.nonEmpty)
+  }
+  object DestroyResult {
+    def from(deleteResult: DeleteResult): Seq[DestroyResult] = {
+      val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq
+        .map(DestroySuccess)
+      val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq
+        .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id)))
+
+      success ++ notFound
+    }
+  }
+  trait DestroyResult
+  case class DestroySuccess(messageId: MessageId) extends DestroyResult
+  case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult {
+    def asMessageSetError: SetError = e match {
+      case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}"))
+      case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
+      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+    }
+  }
+}
+
+class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager,
+                                        messageIdFactory: MessageId.Factory) {
+  def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = {
+    if (emailSetRequest.destroy.isDefined) {
+      val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value
+        .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither
+          .left.map(e => DestroyFailure(unparsedId, e)))
+      val messageIds: Seq[MessageId] = messageIdsValidation.flatMap {
+        case Right(messageId) => Some(messageId)
+        case _ => None
+      }
+      val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap {
+        case Left(e) => Some(e)
+        case _ => None
+      }
+
+      SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
+        .map(DestroyResult.from)
+        .subscribeOn(Schedulers.elastic())
+        .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))
+        .map(_ ++ parsingErrors)
+        .map(DestroyResults)
+    } else {
+      SMono.just(DestroyResults(Seq()))
+    }
+  }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
index 5a2dd36..1bcc308 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
@@ -18,142 +18,36 @@
  ****************************************************************/
 package org.apache.james.jmap.method
 
-import java.time.ZonedDateTime
-import java.util.Date
-import java.util.function.Consumer
-
-import com.google.common.collect.ImmutableList
 import eu.timepit.refined.auto._
 import javax.inject.Inject
-import javax.mail.Flags
 import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE, JMAP_MAIL}
 import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
-import org.apache.james.jmap.core.SetError.SetErrorDescription
-import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, SetError, State, UTCDate}
+import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, State}
 import org.apache.james.jmap.json.{EmailSetSerializer, ResponseSerializer}
-import org.apache.james.jmap.mail.EmailSet.{EmailCreationId, UnparsedMessageId}
-import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY
-import org.apache.james.jmap.mail.{DestroyIds, EmailCreationRequest, EmailCreationResponse, EmailSet, EmailSetRequest, EmailSetResponse, MailboxIds, ValidatedEmailSetUpdate}
+import org.apache.james.jmap.mail.{EmailSetRequest, EmailSetResponse}
 import org.apache.james.jmap.routes.SessionSupplier
-import org.apache.james.mailbox.MessageManager.{AppendCommand, FlagsUpdateMode}
-import org.apache.james.mailbox.exception.MailboxNotFoundException
-import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MailboxId, MessageId, MessageRange}
-import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager}
+import org.apache.james.mailbox.MailboxSession
+import org.apache.james.mailbox.model.MessageId
 import org.apache.james.metrics.api.MetricFactory
-import play.api.libs.json.{JsError, JsObject, JsSuccess}
-import reactor.core.scala.publisher.{SFlux, SMono}
-import reactor.core.scheduler.Schedulers
-
-import scala.jdk.CollectionConverters._
+import play.api.libs.json.{JsError, JsSuccess}
+import reactor.core.scala.publisher.SMono
 
 case class MessageNotFoundExeception(messageId: MessageId) extends Exception
 
 class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
-                               messageIdManager: MessageIdManager,
-                               mailboxManager: MailboxManager,
-                               messageIdFactory: MessageId.Factory,
                                val metricFactory: MetricFactory,
-                               val sessionSupplier: SessionSupplier) extends MethodRequiringAccountId[EmailSetRequest] {
-
-  case class DestroyResults(results: Seq[DestroyResult]) {
-    def destroyed: Option[DestroyIds] =
-      Option(results.flatMap{
-        case result: DestroySuccess => Some(result.messageId)
-        case _ => None
-      }.map(EmailSet.asUnparsed))
-        .filter(_.nonEmpty)
-        .map(DestroyIds)
-
-    def notDestroyed: Option[Map[UnparsedMessageId, SetError]] =
-      Option(results.flatMap{
-        case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
-        case _ => None
-      }.toMap)
-        .filter(_.nonEmpty)
-  }
-
-  object DestroyResult {
-    def from(deleteResult: DeleteResult): Seq[DestroyResult] = {
-      val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq
-        .map(DestroySuccess)
-      val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq
-        .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id)))
-
-      success ++ notFound
-    }
-  }
-
-  trait DestroyResult
-  case class DestroySuccess(messageId: MessageId) extends DestroyResult
-  case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult {
-    def asMessageSetError: SetError = e match {
-      case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}"))
-      case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
-      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
-    }
-  }
-
-  case class CreationResults(results: Seq[CreationResult]) {
-    def created: Option[Map[EmailCreationId, EmailCreationResponse]] =
-      Option(results.flatMap{
-        case result: CreationSuccess => Some((result.clientId, result.response))
-        case _ => None
-      }.toMap)
-        .filter(_.nonEmpty)
-
-    def notCreated: Option[Map[EmailCreationId, SetError]] = {
-      Option(results.flatMap{
-        case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError))
-        case _ => None
-      }
-        .toMap)
-        .filter(_.nonEmpty)
-    }
-  }
-  trait CreationResult
-  case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult
-  case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult {
-    def asMessageSetError: SetError = e match {
-      case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage))
-      case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage))
-      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
-    }
-  }
-
-  trait UpdateResult
-  case class UpdateSuccess(messageId: MessageId) extends UpdateResult
-  case class UpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends UpdateResult {
-    def asMessageSetError: SetError = e match {
-      case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}"))
-      case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found"))
-      case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
-      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
-    }
-  }
-  case class UpdateResults(results: Seq[UpdateResult]) {
-    def updated: Option[Map[MessageId, Unit]] =
-      Option(results.flatMap{
-        case result: UpdateSuccess => Some(result.messageId, ())
-        case _ => None
-      }.toMap)
-        .filter(_.nonEmpty)
-
-    def notUpdated: Option[Map[UnparsedMessageId, SetError]] =
-      Option(results.flatMap{
-        case failure: UpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
-        case _ => None
-      }.toMap)
-        .filter(_.nonEmpty)
-  }
-
+                               val sessionSupplier: SessionSupplier,
+                               createPerformer: EmailSetCreatePerformer,
+                               deletePerformer: EmailSetDeletePerformer,
+                               updatePerformer: EmailSetUpdatePerformer) extends MethodRequiringAccountId[EmailSetRequest] {
   override val methodName: MethodName = MethodName("Email/set")
   override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, JMAP_MAIL)
 
   override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSetRequest): SMono[InvocationWithContext] = {
     for {
-      destroyResults <- destroy(request, mailboxSession)
-      updateResults <- update(request, mailboxSession)
-      created <- create(request, mailboxSession)
+      destroyResults <- deletePerformer.destroy(request, mailboxSession)
+      updateResults <- updatePerformer.update(request, mailboxSession)
+      created <- createPerformer.create(request, mailboxSession)
     } yield InvocationWithContext(
       invocation = Invocation(
         methodName = invocation.invocation.methodName,
@@ -183,233 +77,4 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
       case JsSuccess(emailSetRequest, _) => SMono.just(emailSetRequest)
       case errors: JsError => SMono.raiseError(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
     }
-
-  private def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = {
-    if (emailSetRequest.destroy.isDefined) {
-      val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value
-        .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither
-          .left.map(e => DestroyFailure(unparsedId, e)))
-      val messageIds: Seq[MessageId] = messageIdsValidation.flatMap {
-        case Right(messageId) => Some(messageId)
-        case _ => None
-      }
-      val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap {
-        case Left(e) => Some(e)
-        case _ => None
-      }
-
-      SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
-        .map(DestroyResult.from)
-        .subscribeOn(Schedulers.elastic())
-        .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))
-        .map(_ ++ parsingErrors)
-        .map(DestroyResults)
-    } else {
-      SMono.just(DestroyResults(Seq()))
-    }
-  }
-
-  private def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] =
-    SFlux.fromIterable(request.create.getOrElse(Map()))
-      .concatMap {
-        case (clientId, json) => serializer.deserializeCreationRequest(json)
-          .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))),
-            creationRequest => create(clientId, creationRequest, mailboxSession))
-      }.collectSeq()
-      .map(CreationResults)
-
-  private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = {
-    val mailboxIds: List[MailboxId] = request.mailboxIds.value
-    if (mailboxIds.size != 1) {
-      SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1")))
-    } else {
-      request.toMime4JMessage
-        .fold(e => SMono.just(CreationFailure(clientId, e)),
-          message => SMono.fromCallable[CreationResult](() => {
-            val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession)
-              .appendMessage(AppendCommand.builder()
-                .recent()
-                .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags()))
-                .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant))
-                .build(message),
-                mailboxSession)
-            CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId))
-          })
-            .subscribeOn(Schedulers.elastic())
-            .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e))))
-    }
-  }
-
-  private def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[UpdateResults] = {
-    emailSetRequest.update
-      .filter(_.nonEmpty)
-      .map(update(_, mailboxSession))
-      .getOrElse(SMono.just(UpdateResults(Seq())))
-  }
-
-  private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[UpdateResults] = {
-    val validatedUpdates: List[Either[UpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates
-      .map({
-        case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId)
-          .toEither
-          .left.map(e => UpdateFailure(unparsedMessageId, e))
-          .flatMap(id => serializer.deserializeEmailSetUpdate(json)
-            .asEither.left.map(e => new IllegalArgumentException(e.toString))
-            .flatMap(_.validate)
-            .fold(e => Left(UpdateFailure(unparsedMessageId, e)),
-              emailSetUpdate => Right((id, emailSetUpdate))))
-      })
-      .toList
-    val failures: List[UpdateFailure] = validatedUpdates.flatMap({
-      case Left(e) => Some(e)
-      case _ => None
-    })
-    val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({
-      case Right(pair) => Some(pair)
-      case _ => None
-    })
-
-    for {
-      updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
-        .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
-        .flatMap(metaData => {
-          doUpdate(validUpdates, metaData, session)
-        })
-    } yield {
-      UpdateResults(updates ++ failures)
-    }
-  }
-
-  private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
-                       metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                       session: MailboxSession): SMono[Seq[UpdateResult]] = {
-    val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1
-    val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1
-
-    if (sameUpdate && singleMailbox && validUpdates.size > 3) {
-      val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get
-      val ranges: List[MessageRange] = asRanges(metaData)
-      val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get
-
-      if (update.update.isOnlyFlagAddition) {
-        updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session)
-      } else if (update.update.isOnlyFlagRemoval) {
-        updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session)
-      } else if (update.update.isOnlyMove) {
-        moveByRange(mailboxId, update, ranges, metaData, session)
-      } else {
-        updateEachMessage(validUpdates, metaData, session)
-      }
-    } else {
-      updateEachMessage(validUpdates, metaData, session)
-    }
-  }
-
-  private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) =
-    MessageRange.toRanges(metaData.values
-      .flatten.map(_.getComposedMessageId.getUid)
-      .toList.asJava)
-      .asScala.toList
-
-  private def updateFlagsByRange(mailboxId: MailboxId,
-                                 flags: Flags,
-                                 ranges: List[MessageRange],
-                                 metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                                 updateMode: FlagsUpdateMode,
-                                 session: MailboxSession): SMono[Seq[UpdateResult]] = {
-    val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session))
-
-    mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
-      range => mailbox.setFlags(flags, updateMode, range, session)))
-      .subscribeOn(Schedulers.elastic())
-  }
-
-  private def moveByRange(mailboxId: MailboxId,
-                          update: ValidatedEmailSetUpdate,
-                          ranges: List[MessageRange],
-                          metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                          session: MailboxSession): SMono[Seq[UpdateResult]] = {
-    val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
-
-    updateByRange(ranges, metaData,
-      range => mailboxManager.moveMessages(range, mailboxId, targetId, session))
-  }
-
-  private def updateByRange(ranges: List[MessageRange],
-                            metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                            operation: Consumer[MessageRange]): SMono[Seq[UpdateResult]] = {
-
-    SFlux.fromIterable(ranges)
-      .concatMap(range => {
-        val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
-          .keys
-          .toSeq
-        SMono.fromCallable[Seq[UpdateResult]](() => {
-          operation.accept(range)
-          messageIds.map(UpdateSuccess)
-        })
-          .onErrorResume(e => SMono.just(messageIds.map(id => UpdateFailure(EmailSet.asUnparsed(id), e))))
-          .subscribeOn(Schedulers.elastic())
-      })
-      .reduce(Seq(), _ ++ _)
-  }
-
-  private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
-                                metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
-                                session: MailboxSession): SMono[Seq[UpdateResult]] =
-    SFlux.fromIterable(validUpdates)
-      .concatMap[UpdateResult]({
-        case (messageId, updatePatch) =>
-          updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
-      })
-      .collectSeq()
-
-  private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = {
-    val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId))
-    val originFlags: Flags = storedMetaData
-      .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => {
-        flags.add(m.getFlags)
-        flags
-      })
-
-    if (mailboxIds.value.isEmpty) {
-      SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId)))
-    } else {
-      updateFlags(messageId, update, mailboxIds, originFlags, session)
-        .flatMap {
-          case failure: UpdateFailure => SMono.just[UpdateResult](failure)
-          case _: UpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session)
-        }
-        .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e)))
-        .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId)))
-    }
-  }
-
-  private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[UpdateResult] = {
-    val targetIds = update.mailboxIdsTransformation.apply(mailboxIds)
-    if (targetIds.equals(mailboxIds)) {
-      SMono.just[UpdateResult](UpdateSuccess(messageId))
-    } else {
-      SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session))
-        .subscribeOn(Schedulers.elastic())
-        .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId)))
-        .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e)))
-        .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId)))
-    }
-  }
-
-  private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[UpdateResult] = {
-    val newFlags = update.keywordsTransformation
-      .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get)
-      .asFlagsWithRecentAndDeletedFrom(originalFlags)
-
-    if (newFlags.equals(originalFlags)) {
-      SMono.just[UpdateResult](UpdateSuccess(messageId))
-    } else {
-      SMono.fromCallable(() =>
-        messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
-        .subscribeOn(Schedulers.elastic())
-        .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId)))
-    }
-  }
 }
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
new file mode 100644
index 0000000..662435d
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -0,0 +1,249 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import java.util.function.Consumer
+
+import com.google.common.collect.ImmutableList
+import javax.inject.Inject
+import javax.mail.Flags
+import org.apache.james.jmap.core.SetError
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.json.EmailSetSerializer
+import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId
+import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY
+import org.apache.james.jmap.mail.{EmailSet, EmailSetRequest, MailboxIds, ValidatedEmailSetUpdate}
+import org.apache.james.jmap.method.EmailSetUpdatePerformer.{EmailUpdateFailure, EmailUpdateResult, EmailUpdateResults, EmailUpdateSuccess}
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode
+import org.apache.james.mailbox.exception.MailboxNotFoundException
+import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, MailboxId, MessageId, MessageRange}
+import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager}
+import play.api.libs.json.JsObject
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+
+import scala.jdk.CollectionConverters._
+
+object EmailSetUpdatePerformer {
+  trait EmailUpdateResult
+  case class EmailUpdateSuccess(messageId: MessageId) extends EmailUpdateResult
+  case class EmailUpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends EmailUpdateResult {
+    def asMessageSetError: SetError = e match {
+      case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}"))
+      case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found"))
+      case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
+      case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+    }
+  }
+  case class EmailUpdateResults(results: Seq[EmailUpdateResult]) {
+    def updated: Option[Map[MessageId, Unit]] =
+      Option(results.flatMap{
+        case result: EmailUpdateSuccess => Some(result.messageId, ())
+        case _ => None
+      }.toMap)
+        .filter(_.nonEmpty)
+
+    def notUpdated: Option[Map[UnparsedMessageId, SetError]] =
+      Option(results.flatMap{
+        case failure: EmailUpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
+        case _ => None
+      }.toMap)
+        .filter(_.nonEmpty)
+  }
+}
+
+class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
+                              messageIdManager: MessageIdManager,
+                              mailboxManager: MailboxManager,
+                              messageIdFactory: MessageId.Factory) {
+
+  def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[EmailUpdateResults] = {
+    emailSetRequest.update
+      .filter(_.nonEmpty)
+      .map(update(_, mailboxSession))
+      .getOrElse(SMono.just(EmailUpdateResults(Seq())))
+  }
+
+  private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[EmailUpdateResults] = {
+    val validatedUpdates: List[Either[EmailUpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates
+      .map({
+        case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId)
+          .toEither
+          .left.map(e => EmailUpdateFailure(unparsedMessageId, e))
+          .flatMap(id => serializer.deserializeEmailSetUpdate(json)
+            .asEither.left.map(e => new IllegalArgumentException(e.toString))
+            .flatMap(_.validate)
+            .fold(e => Left(EmailUpdateFailure(unparsedMessageId, e)),
+              emailSetUpdate => Right((id, emailSetUpdate))))
+      })
+      .toList
+    val failures: List[EmailUpdateFailure] = validatedUpdates.flatMap({
+      case Left(e) => Some(e)
+      case _ => None
+    })
+    val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({
+      case Right(pair) => Some(pair)
+      case _ => None
+    })
+
+    for {
+      updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
+        .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
+        .flatMap(metaData => {
+          doUpdate(validUpdates, metaData, session)
+        })
+    } yield {
+      EmailUpdateResults(updates ++ failures)
+    }
+  }
+
+  private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+                       metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                       session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+    val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1
+    val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1
+
+    if (sameUpdate && singleMailbox && validUpdates.size > 3) {
+      val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get
+      val ranges: List[MessageRange] = asRanges(metaData)
+      val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get
+
+      if (update.update.isOnlyFlagAddition) {
+        updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session)
+      } else if (update.update.isOnlyFlagRemoval) {
+        updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session)
+      } else if (update.update.isOnlyMove) {
+        moveByRange(mailboxId, update, ranges, metaData, session)
+      } else {
+        updateEachMessage(validUpdates, metaData, session)
+      }
+    } else {
+      updateEachMessage(validUpdates, metaData, session)
+    }
+  }
+
+  private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) =
+    MessageRange.toRanges(metaData.values
+      .flatten.map(_.getComposedMessageId.getUid)
+      .toList.asJava)
+      .asScala.toList
+
+  private def updateFlagsByRange(mailboxId: MailboxId,
+                                 flags: Flags,
+                                 ranges: List[MessageRange],
+                                 metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                                 updateMode: FlagsUpdateMode,
+                                 session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+    val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session))
+
+    mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
+      range => mailbox.setFlags(flags, updateMode, range, session)))
+      .subscribeOn(Schedulers.elastic())
+  }
+
+  private def moveByRange(mailboxId: MailboxId,
+                          update: ValidatedEmailSetUpdate,
+                          ranges: List[MessageRange],
+                          metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                          session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+    val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
+
+    updateByRange(ranges, metaData,
+      range => mailboxManager.moveMessages(range, mailboxId, targetId, session))
+  }
+
+  private def updateByRange(ranges: List[MessageRange],
+                            metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                            operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] = {
+
+    SFlux.fromIterable(ranges)
+      .concatMap(range => {
+        val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
+          .keys
+          .toSeq
+        SMono.fromCallable[Seq[EmailUpdateResult]](() => {
+          operation.accept(range)
+          messageIds.map(EmailUpdateSuccess)
+        })
+          .onErrorResume(e => SMono.just(messageIds.map(id => EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
+          .subscribeOn(Schedulers.elastic())
+      })
+      .reduce(Seq(), _ ++ _)
+  }
+
+  private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+                                metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+                                session: MailboxSession): SMono[Seq[EmailUpdateResult]] =
+    SFlux.fromIterable(validUpdates)
+      .concatMap[EmailUpdateResult]({
+        case (messageId, updatePatch) =>
+          updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
+      })
+      .collectSeq()
+
+  private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[EmailUpdateResult] = {
+    val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId))
+    val originFlags: Flags = storedMetaData
+      .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => {
+        flags.add(m.getFlags)
+        flags
+      })
+
+    if (mailboxIds.value.isEmpty) {
+      SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId)))
+    } else {
+      updateFlags(messageId, update, mailboxIds, originFlags, session)
+        .flatMap {
+          case failure: EmailUpdateFailure => SMono.just[EmailUpdateResult](failure)
+          case _: EmailUpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session)
+        }
+        .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e)))
+        .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+    }
+  }
+
+  private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[EmailUpdateResult] = {
+    val targetIds = update.mailboxIdsTransformation.apply(mailboxIds)
+    if (targetIds.equals(mailboxIds)) {
+      SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
+    } else {
+      SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session))
+        .subscribeOn(Schedulers.elastic())
+        .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+        .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e)))
+        .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+    }
+  }
+
+  private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[EmailUpdateResult] = {
+    val newFlags = update.keywordsTransformation
+      .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get)
+      .asFlagsWithRecentAndDeletedFrom(originalFlags)
+
+    if (newFlags.equals(originalFlags)) {
+      SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
+    } else {
+      SMono.fromCallable(() =>
+        messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
+        .subscribeOn(Schedulers.elastic())
+        .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org