You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/03/18 03:03:41 UTC
[james-project] 08/15: JAMES-3078 UploadRoutes
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2e5b3ba15d8548c92c55bdaea8cba373fb7b139a
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Mar 11 13:45:46 2020 +0700
JAMES-3078 UploadRoutes
---
.../apache/james/mailbox/AttachmentManager.java | 3 +-
.../cassandra/mail/CassandraAttachmentMapper.java | 7 +-
.../inmemory/mail/InMemoryAttachmentMapper.java | 10 +-
.../mailbox/store/StoreAttachmentManager.java | 5 +-
.../james/mailbox/store/StoreMessageManager.java | 1 +
.../james/mailbox/store/mail/AttachmentMapper.java | 3 +-
.../store/mail/model/AttachmentMapperTest.java | 18 +--
.../org/apache/james/jmap/http/UploadRoutes.java | 160 +++++++++++++++++++++
8 files changed, 188 insertions(+), 19 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index aabc4ed..83e0094 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -27,6 +27,7 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.MessageId;
+import org.reactivestreams.Publisher;
public interface AttachmentManager {
@@ -36,7 +37,7 @@ public interface AttachmentManager {
List<Attachment> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException;
- void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException;
+ Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession);
void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId, MailboxSession mailboxSession) throws MailboxException;
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index adf1c3f..a2c4bad 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -110,12 +110,11 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
}
@Override
- public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
- ownerDAO.addOwner(attachment.getAttachmentId(), owner)
+ public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) {
+ return ownerDAO.addOwner(attachment.getAttachmentId(), owner)
.then(Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)))
.map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
- .flatMap(attachmentDAOV2::storeAttachment)
- .block();
+ .flatMap(attachmentDAOV2::storeAttachment);
}
@Override
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
index 54d040c..3267824 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
@@ -38,6 +38,8 @@ import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
+import reactor.core.publisher.Mono;
+
public class InMemoryAttachmentMapper implements AttachmentMapper {
private static final int INITIAL_SIZE = 128;
@@ -73,9 +75,11 @@ public class InMemoryAttachmentMapper implements AttachmentMapper {
}
@Override
- public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
- attachmentsById.put(attachment.getAttachmentId(), attachment);
- ownersByAttachmentId.put(attachment.getAttachmentId(), owner);
+ public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) {
+ return Mono.fromRunnable(() -> {
+ attachmentsById.put(attachment.getAttachmentId(), attachment);
+ ownersByAttachmentId.put(attachment.getAttachmentId(), owner);
+ });
}
@Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index 2c59619..f32d60b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -34,6 +34,7 @@ import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,8 +75,8 @@ public class StoreAttachmentManager implements AttachmentManager {
}
@Override
- public void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException {
- attachmentMapperFactory.getAttachmentMapper(mailboxSession)
+ public Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession) {
+ return attachmentMapperFactory.getAttachmentMapper(mailboxSession)
.storeAttachmentForOwner(attachment, mailboxSession.getUser());
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 2736d37..f97e5f0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -465,6 +465,7 @@ public class StoreMessageManager implements MessageManager {
.addMetaData(message.metaData())
.build(),
new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
.block();
return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
}, MailboxPathLocker.LockType.Write);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
index 8b411b3..ced1530 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
@@ -28,6 +28,7 @@ import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.transaction.Mapper;
+import org.reactivestreams.Publisher;
public interface AttachmentMapper extends Mapper {
@@ -35,7 +36,7 @@ public interface AttachmentMapper extends Mapper {
List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds);
- void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException;
+ Publisher<Void> storeAttachmentForOwner(Attachment attachment, Username owner);
void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId) throws MailboxException;
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
index 97469eb..ad21cf6 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
@@ -37,6 +37,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
+
public abstract class AttachmentMapperTest {
private static final AttachmentId UNKNOWN_ATTACHMENT_ID = AttachmentId.from("unknown");
private static final Username OWNER = Username.of("owner");
@@ -73,7 +75,7 @@ public abstract class AttachmentMapperTest {
.type("content")
.build();
AttachmentId attachmentId = expected.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(expected, OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block();
//When
Attachment attachment = attachmentMapper.getAttachment(attachmentId);
//Then
@@ -116,21 +118,21 @@ public abstract class AttachmentMapperTest {
}
@Test
- void getAttachmentsShouldReturnTheAttachmentsWhenSome() throws Exception {
+ void getAttachmentsShouldReturnTheAttachmentsWhenSome() {
//Given
Attachment expected = Attachment.builder()
.bytes("payload".getBytes(StandardCharsets.UTF_8))
.type("content")
.build();
AttachmentId attachmentId = expected.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(expected, OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block();
Attachment expected2 = Attachment.builder()
.bytes("payload2".getBytes(StandardCharsets.UTF_8))
.type("content")
.build();
AttachmentId attachmentId2 = expected2.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(expected2, OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(expected2, OWNER)).block();
//When
List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(attachmentId, attachmentId2));
@@ -153,7 +155,7 @@ public abstract class AttachmentMapperTest {
.type("content")
.build();
AttachmentId attachmentId = attachment.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
//When
Collection<MessageId> messageIds = attachmentMapper.getRelatedMessageIds(attachmentId);
@@ -270,7 +272,7 @@ public abstract class AttachmentMapperTest {
.build();
AttachmentId attachmentId = attachment.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
//When
Collection<Username> expectedOwners = ImmutableList.of(OWNER);
@@ -305,8 +307,8 @@ public abstract class AttachmentMapperTest {
.build();
AttachmentId attachmentId = attachment.getAttachmentId();
- attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
- attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER);
+ Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
+ Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER)).block();
//When
Collection<Username> expectedOwners = ImmutableList.of(OWNER, ADDITIONAL_OWNER);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java
new file mode 100644
index 0000000..7589cbb
--- /dev/null
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java
@@ -0,0 +1,160 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.jmap.http;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
+import static org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE_UTF8;
+import static org.apache.james.jmap.http.JMAPUrls.UPLOAD;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.james.jmap.JMAPRoutes;
+import org.apache.james.jmap.draft.exceptions.BadRequestException;
+import org.apache.james.jmap.draft.exceptions.InternalErrorException;
+import org.apache.james.jmap.draft.exceptions.UnauthorizedException;
+import org.apache.james.jmap.draft.model.UploadResponse;
+import org.apache.james.mailbox.AttachmentManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.Attachment;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.netty.http.server.HttpServerRequest;
+import reactor.netty.http.server.HttpServerResponse;
+import reactor.netty.http.server.HttpServerRoutes;
+
+public class UploadRoutes implements JMAPRoutes {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UploadRoutes.class);
+
+ static class CancelledUploadException extends RuntimeException {
+
+ }
+
+ private final MetricFactory metricFactory;
+ private final AuthenticationReactiveFilter authenticationReactiveFilter;
+ private final AttachmentManager attachmentManager;
+ private final ObjectMapper objectMapper;
+
+ @Inject
+ private UploadRoutes(MetricFactory metricFactory, AuthenticationReactiveFilter authenticationReactiveFilter, AttachmentManager attachmentManager, ObjectMapper objectMapper) {
+ this.metricFactory = metricFactory;
+ this.authenticationReactiveFilter = authenticationReactiveFilter;
+ this.attachmentManager = attachmentManager;
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public Logger logger() {
+ return LOGGER;
+ }
+
+ @Override
+ public HttpServerRoutes define(HttpServerRoutes builder) {
+ return builder.post(UPLOAD, this::post)
+ .options(UPLOAD, CORS_CONTROL);
+ }
+
+ private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
+ String contentType = request.requestHeaders().get(CONTENT_TYPE);
+ if (Strings.isNullOrEmpty(contentType)) {
+ return response.status(BAD_REQUEST).send();
+ } else {
+ return authenticationReactiveFilter.authenticate(request)
+ .flatMap(session -> post(request, response, contentType, session))
+ .onErrorResume(CancelledUploadException.class, e -> handleCanceledUpload(response, e))
+ .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, e))
+ .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, e))
+ .onErrorResume(InternalErrorException.class, e -> handleInternalError(response, e))
+ .subscribeOn(Schedulers.elastic());
+ }
+ }
+
+ private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, String contentType, MailboxSession session) {
+ InputStream content = ReactorUtils.toInputStream(request.receive().asByteBuffer());
+ return Mono.from(metricFactory.runPublishingTimerMetric("JMAP-upload-post",
+ handle(contentType, content, session, response)));
+ }
+
+ private Mono<Void> handle(String contentType, InputStream content, MailboxSession mailboxSession, HttpServerResponse response) {
+ return uploadContent(contentType, content, mailboxSession)
+ .flatMap(storedContent -> {
+ try {
+ return response.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
+ .status(CREATED)
+ .sendString(Mono.just(objectMapper.writeValueAsString(storedContent)))
+ .then();
+ } catch (JsonProcessingException e) {
+ throw new InternalErrorException("Error serializing upload response", e);
+ }
+ });
+ }
+
+ private Mono<UploadResponse> uploadContent(String contentType, InputStream inputStream, MailboxSession session) {
+ return toBytesArray(inputStream)
+ .map(bytes -> Attachment.builder()
+ .bytes(bytes)
+ .type(contentType)
+ .build())
+ .flatMap(attachment -> Mono.from(attachmentManager.storeAttachment(attachment, session))
+ .thenReturn(UploadResponse.builder()
+ .blobId(attachment.getAttachmentId().getId())
+ .type(attachment.getType())
+ .size(attachment.getSize())
+ .build()));
+ }
+
+ private Mono<byte[]> toBytesArray(InputStream inputStream) {
+ return Mono.fromCallable(() -> {
+ try {
+ return ByteStreams.toByteArray(inputStream);
+ } catch (IOException e) {
+ if (e instanceof EOFException) {
+ throw new CancelledUploadException();
+ } else {
+ throw new InternalErrorException("Error while uploading content", e);
+ }
+ }
+ });
+ }
+
+ private Mono<Void> handleCanceledUpload(HttpServerResponse response, CancelledUploadException e) {
+ LOGGER.info("An upload has been canceled before the end", e);
+ return response.send();
+ }
+
+ private Mono<Void> handleBadRequest(HttpServerResponse response, BadRequestException e) {
+ LOGGER.warn("Invalid authentication request received.", e);
+ return response.status(BAD_REQUEST).send();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org