You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/11/12 04:42:28 UTC

[james-project] 11/12: JAMES-3432 UploadRoutes error handling and schedulers

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6736651799a5dd6350770753cd853057c9c2ed8c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Nov 11 11:58:27 2020 +0700

    JAMES-3432 UploadRoutes error handling and schedulers
    
    Required to pass on Distributed James
---
 .../apache/james/jmap/routes/UploadRoutes.scala    | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
index 9bb9f78..2f084c6 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -19,7 +19,7 @@
 
 package org.apache.james.jmap.routes
 
-import java.io.{IOException, InputStream, UncheckedIOException}
+import java.io.InputStream
 import java.util.stream
 import java.util.stream.Stream
 
@@ -51,6 +51,8 @@ import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
 
+case class TooBigUploadException() extends RuntimeException
+
 object UploadRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes])
 
@@ -96,23 +98,23 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A
   def post(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = {
     request.requestHeaders.get(CONTENT_TYPE) match {
       case contentType => SMono.fromPublisher(
-          authenticator.authenticate(request))
-          .flatMap(session => post(request, response, ContentType.of(contentType), session))
-          .onErrorResume {
-            case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
-            case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e))
-          }
-          .asJava().`then`()
+        authenticator.authenticate(request))
+        .flatMap(session => post(request, response, ContentType.of(contentType), session))
+        .onErrorResume {
+          case e: TooBigUploadException => SMono.fromPublisher(response.status(BAD_REQUEST).sendString(SMono.just("Attempt to upload exceed max size")).`then`())
+          case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
+          case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e))
+        }
+        .asJava()
+        .subscribeOn(Schedulers.elastic())
+        .`then`()
       case _ => response.status(BAD_REQUEST).send
     }
   }
 
-  private def handleUncheckedIOException(response: HttpServerResponse, e: UncheckedIOException) =
-    response.status(BAD_REQUEST).sendString(SMono.just(e.getMessage))
-
   def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = {
     Id.validate(request.param(accountIdParam)) match {
-      case Right(id: Id) => {
+      case Right(id: Id) =>
         val targetAccountId: AccountId = AccountId(id)
         AccountId.from(session.getUser).map(accountId => accountId.equals(targetAccountId))
           .fold[SMono[Void]](
@@ -120,11 +122,9 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A
             value => if (value) {
               SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer))
               .flatMap(content => handle(targetAccountId, contentType, content, session, response))
-              .subscribeOn(Schedulers.elastic())
             } else {
               SMono.raiseError(new UnauthorizedException("Attempt to upload in another account"))
             })
-      }
 
       case Left(throwable: Throwable) => SMono.raiseError(throwable)
     }
@@ -135,16 +135,13 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A
 
     SMono.fromCallable(() => new LimitedInputStream(content, maxSize) {
       override def raiseError(max: Long, count: Long): Unit = if (count > max) {
-        throw new IOException("Attempt to upload exceed max size")
+        throw TooBigUploadException()
       }})
       .flatMap(uploadContent(accountId, contentType, _, mailboxSession))
       .flatMap(uploadResponse => SMono.fromPublisher(response
               .header(CONTENT_TYPE, uploadResponse.`type`.asString())
               .status(CREATED)
               .sendString(SMono.just(serializer.serialize(uploadResponse).toString()))))
-      .onErrorResume {
-        case e: UncheckedIOException => SMono.fromPublisher(handleUncheckedIOException(response, e))
-      }
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org