You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/03 02:25:02 UTC
[james-project] 06/11: [REFACTORING] Split EmailSetMethod and
extract create/update/destroy
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 88f6d210a6f766c6bc0f3c3634372803fbbcc685
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 2 10:35:04 2020 +0700
[REFACTORING] Split EmailSetMethod and extract create/update/destroy
---
.../jmap/method/EmailSetCreatePerformer.scala | 102 ++++++
.../jmap/method/EmailSetDeletePerformer.scala | 99 ++++++
.../apache/james/jmap/method/EmailSetMethod.scala | 361 +--------------------
.../jmap/method/EmailSetUpdatePerformer.scala | 249 ++++++++++++++
4 files changed, 463 insertions(+), 348 deletions(-)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala
new file mode 100644
index 0000000..b7abebe
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import java.time.ZonedDateTime
+import java.util.Date
+
+import javax.inject.Inject
+import javax.mail.Flags
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.core.{SetError, UTCDate}
+import org.apache.james.jmap.json.EmailSetSerializer
+import org.apache.james.jmap.mail.EmailSet.EmailCreationId
+import org.apache.james.jmap.mail.{EmailCreationRequest, EmailCreationResponse, EmailSetRequest}
+import org.apache.james.jmap.method.EmailSetCreatePerformer.{CreationFailure, CreationResult, CreationResults, CreationSuccess}
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.exception.MailboxNotFoundException
+import org.apache.james.mailbox.model.MailboxId
+import org.apache.james.mailbox.{MailboxManager, MailboxSession}
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+
+object EmailSetCreatePerformer {
+ case class CreationResults(results: Seq[CreationResult]) {
+ def created: Option[Map[EmailCreationId, EmailCreationResponse]] =
+ Option(results.flatMap{
+ case result: CreationSuccess => Some((result.clientId, result.response))
+ case _ => None
+ }.toMap)
+ .filter(_.nonEmpty)
+
+ def notCreated: Option[Map[EmailCreationId, SetError]] = {
+ Option(results.flatMap{
+ case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError))
+ case _ => None
+ }
+ .toMap)
+ .filter(_.nonEmpty)
+ }
+ }
+ trait CreationResult
+ case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult
+ case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult {
+ def asMessageSetError: SetError = e match {
+ case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage))
+ case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage))
+ case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+ }
+ }
+}
+
+class EmailSetCreatePerformer @Inject()(serializer: EmailSetSerializer,
+ mailboxManager: MailboxManager) {
+
+ def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] =
+ SFlux.fromIterable(request.create.getOrElse(Map()))
+ .concatMap {
+ case (clientId, json) => serializer.deserializeCreationRequest(json)
+ .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))),
+ creationRequest => create(clientId, creationRequest, mailboxSession))
+ }.collectSeq()
+ .map(CreationResults)
+
+ private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = {
+ val mailboxIds: List[MailboxId] = request.mailboxIds.value
+ if (mailboxIds.size != 1) {
+ SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1")))
+ } else {
+ request.toMime4JMessage
+ .fold(e => SMono.just(CreationFailure(clientId, e)),
+ message => SMono.fromCallable[CreationResult](() => {
+ val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession)
+ .appendMessage(AppendCommand.builder()
+ .recent()
+ .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags()))
+ .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant))
+ .build(message),
+ mailboxSession)
+ CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId))
+ })
+ .subscribeOn(Schedulers.elastic())
+ .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e))))
+ }
+ }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
new file mode 100644
index 0000000..da56ab5
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
@@ -0,0 +1,99 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import javax.inject.Inject
+import org.apache.james.jmap.core.SetError
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId
+import org.apache.james.jmap.mail.{DestroyIds, EmailSet, EmailSetRequest}
+import org.apache.james.jmap.method.EmailSetDeletePerformer.{DestroyFailure, DestroyResult, DestroyResults}
+import org.apache.james.mailbox.model.{DeleteResult, MessageId}
+import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+
+import scala.jdk.CollectionConverters._
+
+object EmailSetDeletePerformer {
+ case class DestroyResults(results: Seq[DestroyResult]) {
+ def destroyed: Option[DestroyIds] =
+ Option(results.flatMap{
+ case result: DestroySuccess => Some(result.messageId)
+ case _ => None
+ }.map(EmailSet.asUnparsed))
+ .filter(_.nonEmpty)
+ .map(DestroyIds)
+
+ def notDestroyed: Option[Map[UnparsedMessageId, SetError]] =
+ Option(results.flatMap{
+ case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
+ case _ => None
+ }.toMap)
+ .filter(_.nonEmpty)
+ }
+ object DestroyResult {
+ def from(deleteResult: DeleteResult): Seq[DestroyResult] = {
+ val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq
+ .map(DestroySuccess)
+ val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq
+ .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id)))
+
+ success ++ notFound
+ }
+ }
+ trait DestroyResult
+ case class DestroySuccess(messageId: MessageId) extends DestroyResult
+ case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult {
+ def asMessageSetError: SetError = e match {
+ case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}"))
+ case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
+ case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+ }
+ }
+}
+
+class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager,
+ messageIdFactory: MessageId.Factory) {
+ def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = {
+ if (emailSetRequest.destroy.isDefined) {
+ val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value
+ .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither
+ .left.map(e => DestroyFailure(unparsedId, e)))
+ val messageIds: Seq[MessageId] = messageIdsValidation.flatMap {
+ case Right(messageId) => Some(messageId)
+ case _ => None
+ }
+ val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap {
+ case Left(e) => Some(e)
+ case _ => None
+ }
+
+ SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
+ .map(DestroyResult.from)
+ .subscribeOn(Schedulers.elastic())
+ .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))
+ .map(_ ++ parsingErrors)
+ .map(DestroyResults)
+ } else {
+ SMono.just(DestroyResults(Seq()))
+ }
+ }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
index 5a2dd36..1bcc308 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
@@ -18,142 +18,36 @@
****************************************************************/
package org.apache.james.jmap.method
-import java.time.ZonedDateTime
-import java.util.Date
-import java.util.function.Consumer
-
-import com.google.common.collect.ImmutableList
import eu.timepit.refined.auto._
import javax.inject.Inject
-import javax.mail.Flags
import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE, JMAP_MAIL}
import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
-import org.apache.james.jmap.core.SetError.SetErrorDescription
-import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, SetError, State, UTCDate}
+import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, State}
import org.apache.james.jmap.json.{EmailSetSerializer, ResponseSerializer}
-import org.apache.james.jmap.mail.EmailSet.{EmailCreationId, UnparsedMessageId}
-import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY
-import org.apache.james.jmap.mail.{DestroyIds, EmailCreationRequest, EmailCreationResponse, EmailSet, EmailSetRequest, EmailSetResponse, MailboxIds, ValidatedEmailSetUpdate}
+import org.apache.james.jmap.mail.{EmailSetRequest, EmailSetResponse}
import org.apache.james.jmap.routes.SessionSupplier
-import org.apache.james.mailbox.MessageManager.{AppendCommand, FlagsUpdateMode}
-import org.apache.james.mailbox.exception.MailboxNotFoundException
-import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MailboxId, MessageId, MessageRange}
-import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager}
+import org.apache.james.mailbox.MailboxSession
+import org.apache.james.mailbox.model.MessageId
import org.apache.james.metrics.api.MetricFactory
-import play.api.libs.json.{JsError, JsObject, JsSuccess}
-import reactor.core.scala.publisher.{SFlux, SMono}
-import reactor.core.scheduler.Schedulers
-
-import scala.jdk.CollectionConverters._
+import play.api.libs.json.{JsError, JsSuccess}
+import reactor.core.scala.publisher.SMono
case class MessageNotFoundExeception(messageId: MessageId) extends Exception
class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
- messageIdManager: MessageIdManager,
- mailboxManager: MailboxManager,
- messageIdFactory: MessageId.Factory,
val metricFactory: MetricFactory,
- val sessionSupplier: SessionSupplier) extends MethodRequiringAccountId[EmailSetRequest] {
-
- case class DestroyResults(results: Seq[DestroyResult]) {
- def destroyed: Option[DestroyIds] =
- Option(results.flatMap{
- case result: DestroySuccess => Some(result.messageId)
- case _ => None
- }.map(EmailSet.asUnparsed))
- .filter(_.nonEmpty)
- .map(DestroyIds)
-
- def notDestroyed: Option[Map[UnparsedMessageId, SetError]] =
- Option(results.flatMap{
- case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
- case _ => None
- }.toMap)
- .filter(_.nonEmpty)
- }
-
- object DestroyResult {
- def from(deleteResult: DeleteResult): Seq[DestroyResult] = {
- val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq
- .map(DestroySuccess)
- val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq
- .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id)))
-
- success ++ notFound
- }
- }
-
- trait DestroyResult
- case class DestroySuccess(messageId: MessageId) extends DestroyResult
- case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult {
- def asMessageSetError: SetError = e match {
- case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}"))
- case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
- case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
- }
- }
-
- case class CreationResults(results: Seq[CreationResult]) {
- def created: Option[Map[EmailCreationId, EmailCreationResponse]] =
- Option(results.flatMap{
- case result: CreationSuccess => Some((result.clientId, result.response))
- case _ => None
- }.toMap)
- .filter(_.nonEmpty)
-
- def notCreated: Option[Map[EmailCreationId, SetError]] = {
- Option(results.flatMap{
- case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError))
- case _ => None
- }
- .toMap)
- .filter(_.nonEmpty)
- }
- }
- trait CreationResult
- case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult
- case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult {
- def asMessageSetError: SetError = e match {
- case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage))
- case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage))
- case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
- }
- }
-
- trait UpdateResult
- case class UpdateSuccess(messageId: MessageId) extends UpdateResult
- case class UpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends UpdateResult {
- def asMessageSetError: SetError = e match {
- case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}"))
- case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found"))
- case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
- case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
- }
- }
- case class UpdateResults(results: Seq[UpdateResult]) {
- def updated: Option[Map[MessageId, Unit]] =
- Option(results.flatMap{
- case result: UpdateSuccess => Some(result.messageId, ())
- case _ => None
- }.toMap)
- .filter(_.nonEmpty)
-
- def notUpdated: Option[Map[UnparsedMessageId, SetError]] =
- Option(results.flatMap{
- case failure: UpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
- case _ => None
- }.toMap)
- .filter(_.nonEmpty)
- }
-
+ val sessionSupplier: SessionSupplier,
+ createPerformer: EmailSetCreatePerformer,
+ deletePerformer: EmailSetDeletePerformer,
+ updatePerformer: EmailSetUpdatePerformer) extends MethodRequiringAccountId[EmailSetRequest] {
override val methodName: MethodName = MethodName("Email/set")
override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, JMAP_MAIL)
override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSetRequest): SMono[InvocationWithContext] = {
for {
- destroyResults <- destroy(request, mailboxSession)
- updateResults <- update(request, mailboxSession)
- created <- create(request, mailboxSession)
+ destroyResults <- deletePerformer.destroy(request, mailboxSession)
+ updateResults <- updatePerformer.update(request, mailboxSession)
+ created <- createPerformer.create(request, mailboxSession)
} yield InvocationWithContext(
invocation = Invocation(
methodName = invocation.invocation.methodName,
@@ -183,233 +77,4 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer,
case JsSuccess(emailSetRequest, _) => SMono.just(emailSetRequest)
case errors: JsError => SMono.raiseError(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
}
-
- private def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = {
- if (emailSetRequest.destroy.isDefined) {
- val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value
- .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither
- .left.map(e => DestroyFailure(unparsedId, e)))
- val messageIds: Seq[MessageId] = messageIdsValidation.flatMap {
- case Right(messageId) => Some(messageId)
- case _ => None
- }
- val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap {
- case Left(e) => Some(e)
- case _ => None
- }
-
- SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
- .map(DestroyResult.from)
- .subscribeOn(Schedulers.elastic())
- .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))
- .map(_ ++ parsingErrors)
- .map(DestroyResults)
- } else {
- SMono.just(DestroyResults(Seq()))
- }
- }
-
- private def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] =
- SFlux.fromIterable(request.create.getOrElse(Map()))
- .concatMap {
- case (clientId, json) => serializer.deserializeCreationRequest(json)
- .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))),
- creationRequest => create(clientId, creationRequest, mailboxSession))
- }.collectSeq()
- .map(CreationResults)
-
- private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = {
- val mailboxIds: List[MailboxId] = request.mailboxIds.value
- if (mailboxIds.size != 1) {
- SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1")))
- } else {
- request.toMime4JMessage
- .fold(e => SMono.just(CreationFailure(clientId, e)),
- message => SMono.fromCallable[CreationResult](() => {
- val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession)
- .appendMessage(AppendCommand.builder()
- .recent()
- .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags()))
- .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant))
- .build(message),
- mailboxSession)
- CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId))
- })
- .subscribeOn(Schedulers.elastic())
- .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e))))
- }
- }
-
- private def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[UpdateResults] = {
- emailSetRequest.update
- .filter(_.nonEmpty)
- .map(update(_, mailboxSession))
- .getOrElse(SMono.just(UpdateResults(Seq())))
- }
-
- private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[UpdateResults] = {
- val validatedUpdates: List[Either[UpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates
- .map({
- case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId)
- .toEither
- .left.map(e => UpdateFailure(unparsedMessageId, e))
- .flatMap(id => serializer.deserializeEmailSetUpdate(json)
- .asEither.left.map(e => new IllegalArgumentException(e.toString))
- .flatMap(_.validate)
- .fold(e => Left(UpdateFailure(unparsedMessageId, e)),
- emailSetUpdate => Right((id, emailSetUpdate))))
- })
- .toList
- val failures: List[UpdateFailure] = validatedUpdates.flatMap({
- case Left(e) => Some(e)
- case _ => None
- })
- val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({
- case Right(pair) => Some(pair)
- case _ => None
- })
-
- for {
- updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
- .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
- .flatMap(metaData => {
- doUpdate(validUpdates, metaData, session)
- })
- } yield {
- UpdateResults(updates ++ failures)
- }
- }
-
- private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
- metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
- session: MailboxSession): SMono[Seq[UpdateResult]] = {
- val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1
- val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1
-
- if (sameUpdate && singleMailbox && validUpdates.size > 3) {
- val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get
- val ranges: List[MessageRange] = asRanges(metaData)
- val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get
-
- if (update.update.isOnlyFlagAddition) {
- updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session)
- } else if (update.update.isOnlyFlagRemoval) {
- updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session)
- } else if (update.update.isOnlyMove) {
- moveByRange(mailboxId, update, ranges, metaData, session)
- } else {
- updateEachMessage(validUpdates, metaData, session)
- }
- } else {
- updateEachMessage(validUpdates, metaData, session)
- }
- }
-
- private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) =
- MessageRange.toRanges(metaData.values
- .flatten.map(_.getComposedMessageId.getUid)
- .toList.asJava)
- .asScala.toList
-
- private def updateFlagsByRange(mailboxId: MailboxId,
- flags: Flags,
- ranges: List[MessageRange],
- metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
- updateMode: FlagsUpdateMode,
- session: MailboxSession): SMono[Seq[UpdateResult]] = {
- val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session))
-
- mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
- range => mailbox.setFlags(flags, updateMode, range, session)))
- .subscribeOn(Schedulers.elastic())
- }
-
- private def moveByRange(mailboxId: MailboxId,
- update: ValidatedEmailSetUpdate,
- ranges: List[MessageRange],
- metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
- session: MailboxSession): SMono[Seq[UpdateResult]] = {
- val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
-
- updateByRange(ranges, metaData,
- range => mailboxManager.moveMessages(range, mailboxId, targetId, session))
- }
-
- private def updateByRange(ranges: List[MessageRange],
- metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
- operation: Consumer[MessageRange]): SMono[Seq[UpdateResult]] = {
-
- SFlux.fromIterable(ranges)
- .concatMap(range => {
- val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
- .keys
- .toSeq
- SMono.fromCallable[Seq[UpdateResult]](() => {
- operation.accept(range)
- messageIds.map(UpdateSuccess)
- })
- .onErrorResume(e => SMono.just(messageIds.map(id => UpdateFailure(EmailSet.asUnparsed(id), e))))
- .subscribeOn(Schedulers.elastic())
- })
- .reduce(Seq(), _ ++ _)
- }
-
- private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
- metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
- session: MailboxSession): SMono[Seq[UpdateResult]] =
- SFlux.fromIterable(validUpdates)
- .concatMap[UpdateResult]({
- case (messageId, updatePatch) =>
- updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
- })
- .collectSeq()
-
- private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = {
- val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId))
- val originFlags: Flags = storedMetaData
- .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => {
- flags.add(m.getFlags)
- flags
- })
-
- if (mailboxIds.value.isEmpty) {
- SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId)))
- } else {
- updateFlags(messageId, update, mailboxIds, originFlags, session)
- .flatMap {
- case failure: UpdateFailure => SMono.just[UpdateResult](failure)
- case _: UpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session)
- }
- .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e)))
- .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId)))
- }
- }
-
- private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[UpdateResult] = {
- val targetIds = update.mailboxIdsTransformation.apply(mailboxIds)
- if (targetIds.equals(mailboxIds)) {
- SMono.just[UpdateResult](UpdateSuccess(messageId))
- } else {
- SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session))
- .subscribeOn(Schedulers.elastic())
- .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId)))
- .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e)))
- .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId)))
- }
- }
-
- private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[UpdateResult] = {
- val newFlags = update.keywordsTransformation
- .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get)
- .asFlagsWithRecentAndDeletedFrom(originalFlags)
-
- if (newFlags.equals(originalFlags)) {
- SMono.just[UpdateResult](UpdateSuccess(messageId))
- } else {
- SMono.fromCallable(() =>
- messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
- .subscribeOn(Schedulers.elastic())
- .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId)))
- }
- }
}
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
new file mode 100644
index 0000000..662435d
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -0,0 +1,249 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import java.util.function.Consumer
+
+import com.google.common.collect.ImmutableList
+import javax.inject.Inject
+import javax.mail.Flags
+import org.apache.james.jmap.core.SetError
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.json.EmailSetSerializer
+import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId
+import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY
+import org.apache.james.jmap.mail.{EmailSet, EmailSetRequest, MailboxIds, ValidatedEmailSetUpdate}
+import org.apache.james.jmap.method.EmailSetUpdatePerformer.{EmailUpdateFailure, EmailUpdateResult, EmailUpdateResults, EmailUpdateSuccess}
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode
+import org.apache.james.mailbox.exception.MailboxNotFoundException
+import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, MailboxId, MessageId, MessageRange}
+import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager}
+import play.api.libs.json.JsObject
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+
+import scala.jdk.CollectionConverters._
+
+object EmailSetUpdatePerformer {
+ trait EmailUpdateResult
+ case class EmailUpdateSuccess(messageId: MessageId) extends EmailUpdateResult
+ case class EmailUpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends EmailUpdateResult {
+ def asMessageSetError: SetError = e match {
+ case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}"))
+ case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found"))
+ case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}"))
+ case _ => SetError.serverFail(SetErrorDescription(e.getMessage))
+ }
+ }
+ case class EmailUpdateResults(results: Seq[EmailUpdateResult]) {
+ def updated: Option[Map[MessageId, Unit]] =
+ Option(results.flatMap{
+ case result: EmailUpdateSuccess => Some(result.messageId, ())
+ case _ => None
+ }.toMap)
+ .filter(_.nonEmpty)
+
+ def notUpdated: Option[Map[UnparsedMessageId, SetError]] =
+ Option(results.flatMap{
+ case failure: EmailUpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError))
+ case _ => None
+ }.toMap)
+ .filter(_.nonEmpty)
+ }
+}
+
+class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer,
+ messageIdManager: MessageIdManager,
+ mailboxManager: MailboxManager,
+ messageIdFactory: MessageId.Factory) {
+
+ def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[EmailUpdateResults] = {
+ emailSetRequest.update
+ .filter(_.nonEmpty)
+ .map(update(_, mailboxSession))
+ .getOrElse(SMono.just(EmailUpdateResults(Seq())))
+ }
+
+ private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[EmailUpdateResults] = {
+ val validatedUpdates: List[Either[EmailUpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates
+ .map({
+ case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId)
+ .toEither
+ .left.map(e => EmailUpdateFailure(unparsedMessageId, e))
+ .flatMap(id => serializer.deserializeEmailSetUpdate(json)
+ .asEither.left.map(e => new IllegalArgumentException(e.toString))
+ .flatMap(_.validate)
+ .fold(e => Left(EmailUpdateFailure(unparsedMessageId, e)),
+ emailSetUpdate => Right((id, emailSetUpdate))))
+ })
+ .toList
+ val failures: List[EmailUpdateFailure] = validatedUpdates.flatMap({
+ case Left(e) => Some(e)
+ case _ => None
+ })
+ val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({
+ case Right(pair) => Some(pair)
+ case _ => None
+ })
+
+ for {
+ updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
+ .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
+ .flatMap(metaData => {
+ doUpdate(validUpdates, metaData, session)
+ })
+ } yield {
+ EmailUpdateResults(updates ++ failures)
+ }
+ }
+
+ private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+ metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+ session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+ val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1
+ val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1
+
+ if (sameUpdate && singleMailbox && validUpdates.size > 3) {
+ val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get
+ val ranges: List[MessageRange] = asRanges(metaData)
+ val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get
+
+ if (update.update.isOnlyFlagAddition) {
+ updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session)
+ } else if (update.update.isOnlyFlagRemoval) {
+ updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session)
+ } else if (update.update.isOnlyMove) {
+ moveByRange(mailboxId, update, ranges, metaData, session)
+ } else {
+ updateEachMessage(validUpdates, metaData, session)
+ }
+ } else {
+ updateEachMessage(validUpdates, metaData, session)
+ }
+ }
+
+ private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) =
+ MessageRange.toRanges(metaData.values
+ .flatten.map(_.getComposedMessageId.getUid)
+ .toList.asJava)
+ .asScala.toList
+
+ private def updateFlagsByRange(mailboxId: MailboxId,
+ flags: Flags,
+ ranges: List[MessageRange],
+ metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+ updateMode: FlagsUpdateMode,
+ session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+ val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session))
+
+ mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData,
+ range => mailbox.setFlags(flags, updateMode, range, session)))
+ .subscribeOn(Schedulers.elastic())
+ }
+
+ private def moveByRange(mailboxId: MailboxId,
+ update: ValidatedEmailSetUpdate,
+ ranges: List[MessageRange],
+ metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+ session: MailboxSession): SMono[Seq[EmailUpdateResult]] = {
+ val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get
+
+ updateByRange(ranges, metaData,
+ range => mailboxManager.moveMessages(range, mailboxId, targetId, session))
+ }
+
+ private def updateByRange(ranges: List[MessageRange],
+ metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+ operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] = {
+
+ SFlux.fromIterable(ranges)
+ .concatMap(range => {
+ val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid)))
+ .keys
+ .toSeq
+ SMono.fromCallable[Seq[EmailUpdateResult]](() => {
+ operation.accept(range)
+ messageIds.map(EmailUpdateSuccess)
+ })
+ .onErrorResume(e => SMono.just(messageIds.map(id => EmailUpdateFailure(EmailSet.asUnparsed(id), e))))
+ .subscribeOn(Schedulers.elastic())
+ })
+ .reduce(Seq(), _ ++ _)
+ }
+
+ private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)],
+ metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]],
+ session: MailboxSession): SMono[Seq[EmailUpdateResult]] =
+ SFlux.fromIterable(validUpdates)
+ .concatMap[EmailUpdateResult]({
+ case (messageId, updatePatch) =>
+ updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session)
+ })
+ .collectSeq()
+
+ private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[EmailUpdateResult] = {
+ val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId))
+ val originFlags: Flags = storedMetaData
+ .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => {
+ flags.add(m.getFlags)
+ flags
+ })
+
+ if (mailboxIds.value.isEmpty) {
+ SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId)))
+ } else {
+ updateFlags(messageId, update, mailboxIds, originFlags, session)
+ .flatMap {
+ case failure: EmailUpdateFailure => SMono.just[EmailUpdateResult](failure)
+ case _: EmailUpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session)
+ }
+ .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e)))
+ .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+ }
+ }
+
+ private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[EmailUpdateResult] = {
+ val targetIds = update.mailboxIdsTransformation.apply(mailboxIds)
+ if (targetIds.equals(mailboxIds)) {
+ SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
+ } else {
+ SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session))
+ .subscribeOn(Schedulers.elastic())
+ .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+ .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e)))
+ .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+ }
+ }
+
+ private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[EmailUpdateResult] = {
+ val newFlags = update.keywordsTransformation
+ .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get)
+ .asFlagsWithRecentAndDeletedFrom(originalFlags)
+
+ if (newFlags.equals(originalFlags)) {
+ SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
+ } else {
+ SMono.fromCallable(() =>
+ messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
+ .subscribeOn(Schedulers.elastic())
+ .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org