You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/10/26 05:20:18 UTC
[james-project] 05/08: JAMES-3432 Upload: Attachment
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7f6003f5ddc5540c56622f99579fb59f6af5106b
Author: duc91 <vd...@linagora.com>
AuthorDate: Thu Oct 22 18:11:47 2020 +0700
JAMES-3432 Upload: Attachment
---
.../james/jmap/rfc8621/RFC8621MethodsModule.java | 5 +-
.../rfc8621/distributed/DistributedUploadTest.java | 53 ++++++++
.../james/jmap/rfc8621/contract/Fixture.scala | 2 +
.../jmap/rfc8621/contract/UploadContract.scala | 131 ++++++++++++++++++++
.../apache/james/jmap/json/UploadSerializer.scala | 15 +++
.../apache/james/jmap/routes/DownloadRoutes.scala | 39 ++++--
.../apache/james/jmap/routes/UploadRoutes.scala | 133 +++++++++++++++++++++
7 files changed, 368 insertions(+), 10 deletions(-)
diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 535f1d5..13eeaed 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -47,6 +47,7 @@ import org.apache.james.jmap.method.VacationResponseSetMethod;
import org.apache.james.jmap.method.ZoneIdProvider;
import org.apache.james.jmap.model.JmapRfc8621Configuration;
import org.apache.james.jmap.routes.DownloadRoutes;
+import org.apache.james.jmap.routes.UploadRoutes;
import org.apache.james.jmap.routes.JMAPApiRoutes;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.PropertiesProvider;
@@ -80,8 +81,8 @@ public class RFC8621MethodsModule extends AbstractModule {
}
@ProvidesIntoSet
- JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes) {
- return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes);
+ JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) {
+ return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes);
}
@Provides
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java
new file mode 100644
index 0000000..9d432db
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.distributed;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerElasticSearchExtension;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.jmap.rfc8621.contract.UploadContract;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class DistributedUploadTest implements UploadContract {
+ @RegisterExtension
+ static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+ CassandraRabbitMQJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .blobStore(BlobStoreConfiguration.builder()
+ .s3()
+ .disableCache()
+ .deduplication())
+ .build())
+ .extension(new DockerElasticSearchExtension())
+ .extension(new CassandraExtension())
+ .extension(new RabbitMQExtension())
+ .extension(new AwsS3BlobStoreExtension())
+ .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
+ .overrideWith(new TestJMAPServerModule()))
+ .build();
+}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
index 4b70b04..6b2a9fb 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala
@@ -38,6 +38,7 @@ import org.apache.james.mime4j.dom.Message
object Fixture {
val ACCOUNT_ID: String = "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6"
+ val ALICE_ACCOUNT_ID: String = "2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90"
def createTestMessage: Message = Message.Builder
.of
@@ -140,6 +141,7 @@ object Fixture {
|}""".stripMargin
val ACCEPT_RFC8621_VERSION_HEADER: String = "application/json; jmapVersion=rfc-8621"
+ val RFC8621_VERSION_HEADER: String = "jmapVersion=rfc-8621"
val USER: Username = Username.fromLocalPartWithDomain("user", DOMAIN)
val USER_PASSWORD: String = "user"
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala
new file mode 100644
index 0000000..9605f44
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala
@@ -0,0 +1,131 @@
+package org.apache.james.jmap.rfc8621.contract
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.nio.charset.StandardCharsets
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import io.restassured.http.ContentType
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.commons.io.IOUtils
+import org.apache.http.HttpStatus.{SC_CREATED, SC_NOT_FOUND, SC_OK, SC_UNAUTHORIZED}
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.http.UserCredential
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, BOB, BOB_PASSWORD, DOMAIN, RFC8621_VERSION_HEADER, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.UploadContract.{BIG_INPUT_STREAM, VALID_INPUT_STREAM}
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import play.api.libs.json.{JsString, Json}
+
+object UploadContract {
+ private val BIG_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(10025).getBytes)
+ private val VALID_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(1).getBytes)
+}
+
+trait UploadContract {
+ @BeforeEach
+ def setUp(server: GuiceJamesServer): Unit = {
+ server.getProbe(classOf[DataProbeImpl])
+ .fluent
+ .addDomain(DOMAIN.asString)
+ .addUser(BOB.asString, BOB_PASSWORD)
+
+ requestSpecification = baseRequestSpecBuilder(server)
+ .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+ .build
+ }
+
+ @Test
+ def shouldUploadFileAndOnlyOwnerCanAccess(): Unit = {
+ val uploadResponse: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(VALID_INPUT_STREAM)
+ .when
+ .post(s"/upload/$ACCOUNT_ID/")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .body
+ .asString
+
+ val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value
+
+ val downloadResponse: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .when
+ .get(s"/download/$ACCOUNT_ID/$blobId")
+ .`then`
+ .statusCode(SC_OK)
+ .extract
+ .body
+ .asString
+
+ val expectedResponse: String = IOUtils.toString(VALID_INPUT_STREAM, StandardCharsets.UTF_8)
+
+ assertThat(new ByteArrayInputStream(downloadResponse.getBytes(StandardCharsets.UTF_8)))
+ .hasContent(expectedResponse)
+ }
+
+ @Test
+ def shouldRejectWhenUploadFileTooBig(): Unit = {
+ val response: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(BIG_INPUT_STREAM)
+ .when
+ .post(s"/upload/$ACCOUNT_ID/")
+ .`then`
+ .statusCode(SC_OK)
+ .extract
+ .body
+ .asString
+
+ // fixme: dont know we limit size or not?
+ assertThatJson(response)
+ .isEqualTo("Should be error")
+ }
+
+ @Test
+ def uploadShouldRejectWhenUnauthenticated(): Unit = {
+ `given`
+ .auth()
+ .none()
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(VALID_INPUT_STREAM)
+ .when
+ .post(s"/upload/$ACCOUNT_ID/")
+ .`then`
+ .statusCode(SC_UNAUTHORIZED)
+ }
+
+ @Test
+ def uploadShouldSucceedButExpiredWhenDownload(): Unit = {
+ val uploadResponse: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(VALID_INPUT_STREAM)
+ .when
+ .post(s"/upload/$ACCOUNT_ID/")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .body
+ .asString
+
+ val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value
+
+ // fixme: dont know how to delete file with existing attachment api
+ `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .when
+ .get(s"/download/$ACCOUNT_ID/$blobId")
+ .`then`
+ .statusCode(SC_NOT_FOUND)
+ }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala
new file mode 100644
index 0000000..a7f8b25
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala
@@ -0,0 +1,15 @@
+package org.apache.james.jmap.json
+
+import org.apache.james.jmap.mail.BlobId
+import org.apache.james.jmap.routes.UploadResponse
+import org.apache.james.mailbox.model.ContentType
+import play.api.libs.json.{JsString, JsValue, Json, Writes}
+
+class UploadSerializer {
+
+ private implicit val blobIdWrites: Writes[BlobId] = Json.valueWrites[BlobId]
+ private implicit val contentTypeWrites: Writes[ContentType] = contentType => JsString(contentType.asString())
+ private implicit val uploadResponseWrites: Writes[UploadResponse] = Json.writes[UploadResponse]
+
+ def serialize(uploadResponse: UploadResponse): JsValue = Json.toJson(uploadResponse)(uploadResponseWrites)
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 03af576..f7c580e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
@@ -6,16 +6,16 @@
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
- * ***************************************************************/
+ ****************************************************************/
package org.apache.james.jmap.routes
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
@@ -38,8 +38,8 @@ import org.apache.james.jmap.mail.Email.Size
import org.apache.james.jmap.mail.{BlobId, EmailBodyPart, PartId}
import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
-import org.apache.james.mailbox.model.{ContentType, FetchGroup, MessageId, MessageResult}
-import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
+import org.apache.james.mailbox.model.{AttachmentId, AttachmentMetadata, ContentType, FetchGroup, MessageId, MessageResult}
+import org.apache.james.mailbox.{AttachmentManager, MailboxSession, MessageIdManager}
import org.apache.james.mime4j.codec.EncoderUtil
import org.apache.james.mime4j.codec.EncoderUtil.Usage
import org.apache.james.mime4j.message.DefaultMessageWriter
@@ -94,6 +94,16 @@ case class MessageBlob(blobId: BlobId, message: MessageResult) extends Blob {
override def content: InputStream = message.getFullContent.getInputStream
}
+case class AttachmentBlob(attachmentMetadata: AttachmentMetadata, fileContent: InputStream) extends Blob {
+ override def size: Try[Size] = Success(UploadRoutes.sanitizeSize(attachmentMetadata.getSize))
+
+ override def contentType: ContentType = attachmentMetadata.getType
+
+ override def content: InputStream = fileContent
+
+ override def blobId: BlobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get
+}
+
case class EmailBodyPartBlob(blobId: BlobId, part: EmailBodyPart) extends Blob {
override def size: Try[Size] = Success(part.size)
@@ -120,6 +130,17 @@ class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
}
}
+class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) extends BlobResolver {
+ override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult =
+ AttachmentId.from(org.apache.james.mailbox.model.BlobId.fromString(blobId.value.value)) match {
+ case attachmentId: AttachmentId => Applicable(
+ SMono.fromCallable(() => attachmentManager.getAttachment(attachmentId, mailboxSession))
+ .map((attachmentMetadata: AttachmentMetadata) => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession)))
+ )
+ case _ => NonApplicable()
+ }
+}
+
class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
val messageIdManager: MessageIdManager) extends BlobResolver {
private def asMessageAndPartId(blobId: BlobId): Try[(MessageId, PartId)] = {
@@ -153,11 +174,13 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
}
class BlobResolvers @Inject()(val messageBlobResolver: MessageBlobResolver,
- val messagePartBlobResolver: MessagePartBlobResolver) {
+ val messagePartBlobResolver: MessagePartBlobResolver,
+ val attachmentBlobResolver: AttachmentBlobResolver) {
def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] =
messageBlobResolver
.resolve(blobId, mailboxSession).asOption
.orElse(messagePartBlobResolver.resolve(blobId, mailboxSession).asOption)
+ .orElse(attachmentBlobResolver.resolve(blobId, mailboxSession).asOption)
.getOrElse(SMono.raiseError(BlobNotFoundException(blobId)))
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
new file mode 100644
index 0000000..f06fa51
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.routes
+
+import java.io.InputStream
+import java.time.ZonedDateTime
+import java.util.stream
+import java.util.stream.Stream
+
+import eu.timepit.refined.api.Refined
+import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
+import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, CREATED}
+import io.netty.handler.codec.http.HttpMethod
+import javax.inject.{Inject, Named}
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.jmap.http.Authenticator
+import org.apache.james.jmap.http.rfc8621.InjectionKeys
+import org.apache.james.jmap.mail.Email.Size
+import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, fromAttachment}
+import org.apache.james.mailbox.{AttachmentManager, MailboxSession}
+import org.apache.james.mailbox.model.{AttachmentMetadata, ContentType}
+import org.apache.james.util.ReactorUtils
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import eu.timepit.refined.auto._
+import eu.timepit.refined.numeric.NonNegative
+import eu.timepit.refined.refineV
+import org.apache.james.jmap.exceptions.UnauthorizedException
+import org.apache.james.jmap.json.UploadSerializer
+import org.apache.james.jmap.mail.BlobId
+
+object UploadRoutes {
+ val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes])
+
+ type Size = Long Refined NonNegative
+ val Zero: Size = 0L
+
+ def sanitizeSize(value: Long): Size = {
+ val size: Either[String, Size] = refineV[NonNegative](value)
+ size.fold(e => {
+ LOGGER.error(s"Encountered an invalid Email size: $e")
+ Zero
+ },
+ refinedValue => refinedValue)
+ }
+
+ def fromAttachment(attachmentMetadata: AttachmentMetadata): UploadResponse =
+ UploadResponse(
+ blobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get,
+ `type` = ContentType.of(attachmentMetadata.getType.asString),
+ size = sanitizeSize(attachmentMetadata.getSize),
+ expires = None)
+}
+
+case class UploadResponse(blobId: BlobId,
+ `type`: ContentType,
+ size: Size,
+ expires: Option[ZonedDateTime])
+
+class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
+ val attachmentManager: AttachmentManager,
+ val serializer: UploadSerializer) extends JMAPRoutes {
+
+ class CancelledUploadException extends RuntimeException {
+
+ }
+
+ private val accountIdParam: String = "accountId"
+ private val uploadURI = s"/upload/{$accountIdParam}/"
+
+ override def routes(): stream.Stream[JMAPRoute] = Stream.of(
+ JMAPRoute.builder
+ .endpoint(new Endpoint(HttpMethod.POST, uploadURI))
+ .action(this.post)
+ .corsHeaders,
+ JMAPRoute.builder
+ .endpoint(new Endpoint(HttpMethod.OPTIONS, uploadURI))
+ .action(JMAPRoutes.CORS_CONTROL)
+ .noCorsHeaders)
+
+ def post(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = {
+ request.requestHeaders.get(CONTENT_TYPE) match {
+ case contentType => SMono.fromPublisher(
+ authenticator.authenticate(request))
+ .flatMap(session => post(request, response, ContentType.of(contentType), session))
+ .onErrorResume {
+ case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
+ case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e))
+ }
+ .asJava().`then`()
+ case _ => response.status(BAD_REQUEST).send
+ }
+ }
+
+ def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = {
+ SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer))
+ .flatMap(content => handle(contentType, content, session, response))
+ .subscribeOn(Schedulers.elastic())
+ }
+
+ def handle(contentType: ContentType, content: InputStream, mailboxSession: MailboxSession, response: HttpServerResponse): SMono[Void] =
+ uploadContent(contentType, content, mailboxSession)
+ .flatMap(uploadResponse => SMono.fromPublisher(response
+ .header(CONTENT_TYPE, uploadResponse.`type`.asString())
+ .status(CREATED)
+ .sendString(SMono.just(serializer.serialize(uploadResponse).toString()))))
+
+ def uploadContent(contentType: ContentType, inputStream: InputStream, session: MailboxSession): SMono[UploadResponse] =
+ SMono
+ .fromPublisher(attachmentManager.storeAttachment(contentType, inputStream, session))
+ .map(fromAttachment)
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org