You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/10/26 05:20:18 UTC

[james-project] 05/08: JAMES-3432 Upload: Attachment

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7f6003f5ddc5540c56622f99579fb59f6af5106b
Author: duc91 <vd...@linagora.com>
AuthorDate: Thu Oct 22 18:11:47 2020 +0700

    JAMES-3432 Upload: Attachment
---
 .../james/jmap/rfc8621/RFC8621MethodsModule.java   |   5 +-
 .../rfc8621/distributed/DistributedUploadTest.java |  53 ++++++++
 .../james/jmap/rfc8621/contract/Fixture.scala      |   2 +
 .../jmap/rfc8621/contract/UploadContract.scala     | 131 ++++++++++++++++++++
 .../apache/james/jmap/json/UploadSerializer.scala  |  15 +++
 .../apache/james/jmap/routes/DownloadRoutes.scala  |  39 ++++--
 .../apache/james/jmap/routes/UploadRoutes.scala    | 133 +++++++++++++++++++++
 7 files changed, 368 insertions(+), 10 deletions(-)

diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 535f1d5..13eeaed 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -47,6 +47,7 @@ import org.apache.james.jmap.method.VacationResponseSetMethod;
 import org.apache.james.jmap.method.ZoneIdProvider;
 import org.apache.james.jmap.model.JmapRfc8621Configuration;
 import org.apache.james.jmap.routes.DownloadRoutes;
+import org.apache.james.jmap.routes.UploadRoutes;
 import org.apache.james.jmap.routes.JMAPApiRoutes;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.utils.PropertiesProvider;
@@ -80,8 +81,8 @@ public class RFC8621MethodsModule extends AbstractModule {
     }
 
     @ProvidesIntoSet
-    JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes) {
-        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes);
+    JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) {
+        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes);
     }
 
     @Provides
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java
new file mode 100644
index 0000000..9d432db
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.distributed;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerElasticSearchExtension;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.jmap.rfc8621.contract.UploadContract;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class DistributedUploadTest implements UploadContract {
+    @RegisterExtension
+    static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+        CassandraRabbitMQJamesConfiguration.builder()
+            .workingDirectory(tmpDir)
+            .configurationFromClasspath()
+            .blobStore(BlobStoreConfiguration.builder()
+                .s3()
+                .disableCache()
+                .deduplication())
+            .build())
+        .extension(new DockerElasticSearchExtension())
+        .extension(new CassandraExtension())
+        .extension(new RabbitMQExtension())
+        .extension(new AwsS3BlobStoreExtension())
+        .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
+            .overrideWith(new TestJMAPServerModule()))
+        .build();
+}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
index 4b70b04..6b2a9fb 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
@@ -38,6 +38,7 @@ import org.apache.james.mime4j.dom.Message
 
 object Fixture {
   val ACCOUNT_ID: String = "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6"
+  val ALICE_ACCOUNT_ID: String = "2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90"
 
   def createTestMessage: Message = Message.Builder
       .of
@@ -140,6 +141,7 @@ object Fixture {
       |}""".stripMargin
 
   val ACCEPT_RFC8621_VERSION_HEADER: String = "application/json; jmapVersion=rfc-8621"
+  val RFC8621_VERSION_HEADER: String = "jmapVersion=rfc-8621"
 
   val USER: Username = Username.fromLocalPartWithDomain("user", DOMAIN)
   val USER_PASSWORD: String = "user"
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala
new file mode 100644
index 0000000..9605f44
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala
@@ -0,0 +1,131 @@
+package org.apache.james.jmap.rfc8621.contract
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.nio.charset.StandardCharsets
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import io.restassured.http.ContentType
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.commons.io.IOUtils
+import org.apache.http.HttpStatus.{SC_CREATED, SC_NOT_FOUND, SC_OK, SC_UNAUTHORIZED}
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.http.UserCredential
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, BOB, BOB_PASSWORD, DOMAIN, RFC8621_VERSION_HEADER, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.UploadContract.{BIG_INPUT_STREAM, VALID_INPUT_STREAM}
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import play.api.libs.json.{JsString, Json}
+
+object UploadContract {
+  private val BIG_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(10025).getBytes)
+  private val VALID_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(1).getBytes)
+}
+
+trait UploadContract {
+  @BeforeEach
+  def setUp(server: GuiceJamesServer): Unit = {
+    server.getProbe(classOf[DataProbeImpl])
+      .fluent
+      .addDomain(DOMAIN.asString)
+      .addUser(BOB.asString, BOB_PASSWORD)
+
+    requestSpecification = baseRequestSpecBuilder(server)
+      .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+      .build
+  }
+
+  @Test
+  def shouldUploadFileAndOnlyOwnerCanAccess(): Unit = {
+    val uploadResponse: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(VALID_INPUT_STREAM)
+    .when
+      .post(s"/upload/$ACCOUNT_ID/")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .body
+      .asString
+
+    val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value
+
+    val downloadResponse: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+    .when
+      .get(s"/download/$ACCOUNT_ID/$blobId")
+    .`then`
+      .statusCode(SC_OK)
+      .extract
+      .body
+      .asString
+
+    val expectedResponse: String = IOUtils.toString(VALID_INPUT_STREAM, StandardCharsets.UTF_8)
+
+    assertThat(new ByteArrayInputStream(downloadResponse.getBytes(StandardCharsets.UTF_8)))
+      .hasContent(expectedResponse)
+  }
+
+  @Test
+  def shouldRejectWhenUploadFileTooBig(): Unit = {
+    val response: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(BIG_INPUT_STREAM)
+    .when
+      .post(s"/upload/$ACCOUNT_ID/")
+    .`then`
+      .statusCode(SC_OK)
+      .extract
+      .body
+      .asString
+
+    // fixme: dont know we limit size or not?
+    assertThatJson(response)
+      .isEqualTo("Should be error")
+  }
+
+  @Test
+  def uploadShouldRejectWhenUnauthenticated(): Unit = {
+    `given`
+      .auth()
+      .none()
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(VALID_INPUT_STREAM)
+    .when
+      .post(s"/upload/$ACCOUNT_ID/")
+    .`then`
+      .statusCode(SC_UNAUTHORIZED)
+  }
+
+  @Test
+  def uploadShouldSucceedButExpiredWhenDownload(): Unit = {
+    val uploadResponse: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(VALID_INPUT_STREAM)
+    .when
+      .post(s"/upload/$ACCOUNT_ID/")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .body
+      .asString
+
+    val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value
+
+    // fixme: dont know how to delete file with existing attachment api
+    `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+    .when
+      .get(s"/download/$ACCOUNT_ID/$blobId")
+    .`then`
+      .statusCode(SC_NOT_FOUND)
+  }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala
new file mode 100644
index 0000000..a7f8b25
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala
@@ -0,0 +1,15 @@
+package org.apache.james.jmap.json
+
+import org.apache.james.jmap.mail.BlobId
+import org.apache.james.jmap.routes.UploadResponse
+import org.apache.james.mailbox.model.ContentType
+import play.api.libs.json.{JsString, JsValue, Json, Writes}
+
+class UploadSerializer {
+
+  private implicit val blobIdWrites: Writes[BlobId] = Json.valueWrites[BlobId]
+  private implicit val contentTypeWrites: Writes[ContentType] = contentType => JsString(contentType.asString())
+  private implicit val uploadResponseWrites: Writes[UploadResponse] = Json.writes[UploadResponse]
+
+  def serialize(uploadResponse: UploadResponse): JsValue = Json.toJson(uploadResponse)(uploadResponseWrites)
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 03af576..f7c580e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/****************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,16 +6,16 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0                 *
- * *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- * ***************************************************************/
+ ****************************************************************/
 package org.apache.james.jmap.routes
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
@@ -38,8 +38,8 @@ import org.apache.james.jmap.mail.Email.Size
 import org.apache.james.jmap.mail.{BlobId, EmailBodyPart, PartId}
 import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER}
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
-import org.apache.james.mailbox.model.{ContentType, FetchGroup, MessageId, MessageResult}
-import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
+import org.apache.james.mailbox.model.{AttachmentId, AttachmentMetadata, ContentType, FetchGroup, MessageId, MessageResult}
+import org.apache.james.mailbox.{AttachmentManager, MailboxSession, MessageIdManager}
 import org.apache.james.mime4j.codec.EncoderUtil
 import org.apache.james.mime4j.codec.EncoderUtil.Usage
 import org.apache.james.mime4j.message.DefaultMessageWriter
@@ -94,6 +94,16 @@ case class MessageBlob(blobId: BlobId, message: MessageResult) extends Blob {
   override def content: InputStream = message.getFullContent.getInputStream
 }
 
+case class AttachmentBlob(attachmentMetadata: AttachmentMetadata, fileContent: InputStream) extends Blob {
+  override def size: Try[Size] = Success(UploadRoutes.sanitizeSize(attachmentMetadata.getSize))
+
+  override def contentType: ContentType = attachmentMetadata.getType
+
+  override def content: InputStream = fileContent
+
+  override def blobId: BlobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get
+}
+
 case class EmailBodyPartBlob(blobId: BlobId, part: EmailBodyPart) extends Blob {
   override def size: Try[Size] = Success(part.size)
 
@@ -120,6 +130,17 @@ class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
   }
 }
 
+class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) extends BlobResolver {
+  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult =
+    AttachmentId.from(org.apache.james.mailbox.model.BlobId.fromString(blobId.value.value)) match {
+      case attachmentId: AttachmentId => Applicable(
+        SMono.fromCallable(() => attachmentManager.getAttachment(attachmentId, mailboxSession))
+          .map((attachmentMetadata: AttachmentMetadata) => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession)))
+      )
+      case _ => NonApplicable()
+    }
+}
+
 class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
                                         val messageIdManager: MessageIdManager) extends BlobResolver {
   private def asMessageAndPartId(blobId: BlobId): Try[(MessageId, PartId)] = {
@@ -153,11 +174,13 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
 }
 
 class BlobResolvers @Inject()(val messageBlobResolver: MessageBlobResolver,
-                    val messagePartBlobResolver: MessagePartBlobResolver) {
+                              val messagePartBlobResolver: MessagePartBlobResolver,
+                              val attachmentBlobResolver: AttachmentBlobResolver) {
   def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] =
     messageBlobResolver
       .resolve(blobId, mailboxSession).asOption
       .orElse(messagePartBlobResolver.resolve(blobId, mailboxSession).asOption)
+      .orElse(attachmentBlobResolver.resolve(blobId, mailboxSession).asOption)
       .getOrElse(SMono.raiseError(BlobNotFoundException(blobId)))
 }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
new file mode 100644
index 0000000..f06fa51
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.routes
+
+import java.io.InputStream
+import java.time.ZonedDateTime
+import java.util.stream
+import java.util.stream.Stream
+
+import eu.timepit.refined.api.Refined
+import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
+import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, CREATED}
+import io.netty.handler.codec.http.HttpMethod
+import javax.inject.{Inject, Named}
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.jmap.http.Authenticator
+import org.apache.james.jmap.http.rfc8621.InjectionKeys
+import org.apache.james.jmap.mail.Email.Size
+import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, fromAttachment}
+import org.apache.james.mailbox.{AttachmentManager, MailboxSession}
+import org.apache.james.mailbox.model.{AttachmentMetadata, ContentType}
+import org.apache.james.util.ReactorUtils
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import eu.timepit.refined.auto._
+import eu.timepit.refined.numeric.NonNegative
+import eu.timepit.refined.refineV
+import org.apache.james.jmap.exceptions.UnauthorizedException
+import org.apache.james.jmap.json.UploadSerializer
+import org.apache.james.jmap.mail.BlobId
+
+object UploadRoutes {
+  val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes])
+
+  type Size = Long Refined NonNegative
+  val Zero: Size = 0L
+
+  def sanitizeSize(value: Long): Size = {
+    val size: Either[String, Size] = refineV[NonNegative](value)
+    size.fold(e => {
+      LOGGER.error(s"Encountered an invalid Email size: $e")
+      Zero
+    },
+      refinedValue => refinedValue)
+  }
+
+  def fromAttachment(attachmentMetadata: AttachmentMetadata): UploadResponse =
+    UploadResponse(
+        blobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get,
+        `type` = ContentType.of(attachmentMetadata.getType.asString),
+        size = sanitizeSize(attachmentMetadata.getSize),
+        expires = None)
+}
+
+case class UploadResponse(blobId: BlobId,
+                          `type`: ContentType,
+                          size: Size,
+                          expires: Option[ZonedDateTime])
+
+class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
+                             val attachmentManager: AttachmentManager,
+                             val serializer: UploadSerializer) extends JMAPRoutes {
+
+  class CancelledUploadException extends RuntimeException {
+
+  }
+
+  private val accountIdParam: String = "accountId"
+  private val uploadURI = s"/upload/{$accountIdParam}/"
+
+  override def routes(): stream.Stream[JMAPRoute] = Stream.of(
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.POST, uploadURI))
+      .action(this.post)
+      .corsHeaders,
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.OPTIONS, uploadURI))
+      .action(JMAPRoutes.CORS_CONTROL)
+      .noCorsHeaders)
+
+  def post(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = {
+    request.requestHeaders.get(CONTENT_TYPE) match {
+      case contentType => SMono.fromPublisher(
+          authenticator.authenticate(request))
+          .flatMap(session => post(request, response, ContentType.of(contentType), session))
+          .onErrorResume {
+            case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
+            case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e))
+          }
+          .asJava().`then`()
+      case _ => response.status(BAD_REQUEST).send
+    }
+  }
+
+  def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = {
+    SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer))
+      .flatMap(content => handle(contentType, content, session, response))
+      .subscribeOn(Schedulers.elastic())
+  }
+
+  def handle(contentType: ContentType, content: InputStream, mailboxSession: MailboxSession, response: HttpServerResponse): SMono[Void] =
+    uploadContent(contentType, content, mailboxSession)
+      .flatMap(uploadResponse => SMono.fromPublisher(response
+            .header(CONTENT_TYPE, uploadResponse.`type`.asString())
+            .status(CREATED)
+            .sendString(SMono.just(serializer.serialize(uploadResponse).toString()))))
+
+  def uploadContent(contentType: ContentType, inputStream: InputStream, session: MailboxSession): SMono[UploadResponse] =
+    SMono
+      .fromPublisher(attachmentManager.storeAttachment(contentType, inputStream, session))
+      .map(fromAttachment)
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org