You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/03/18 03:03:41 UTC

[james-project] 08/15: JAMES-3078 UploadRoutes

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2e5b3ba15d8548c92c55bdaea8cba373fb7b139a
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Mar 11 13:45:46 2020 +0700

    JAMES-3078 UploadRoutes
---
 .../apache/james/mailbox/AttachmentManager.java    |   3 +-
 .../cassandra/mail/CassandraAttachmentMapper.java  |   7 +-
 .../inmemory/mail/InMemoryAttachmentMapper.java    |  10 +-
 .../mailbox/store/StoreAttachmentManager.java      |   5 +-
 .../james/mailbox/store/StoreMessageManager.java   |   1 +
 .../james/mailbox/store/mail/AttachmentMapper.java |   3 +-
 .../store/mail/model/AttachmentMapperTest.java     |  18 +--
 .../org/apache/james/jmap/http/UploadRoutes.java   | 160 +++++++++++++++++++++
 8 files changed, 188 insertions(+), 19 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index aabc4ed..83e0094 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -27,6 +27,7 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageId;
+import org.reactivestreams.Publisher;
 
 public interface AttachmentManager {
 
@@ -36,7 +37,7 @@ public interface AttachmentManager {
 
     List<Attachment> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException;
 
-    void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException;
+    Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession);
 
     void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId, MailboxSession mailboxSession) throws MailboxException;
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index adf1c3f..a2c4bad 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -110,12 +110,11 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
     }
 
     @Override
-    public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
-        ownerDAO.addOwner(attachment.getAttachmentId(), owner)
+    public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) {
+        return ownerDAO.addOwner(attachment.getAttachmentId(), owner)
             .then(Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)))
             .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
-            .flatMap(attachmentDAOV2::storeAttachment)
-            .block();
+            .flatMap(attachmentDAOV2::storeAttachment);
     }
 
     @Override
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
index 54d040c..3267824 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryAttachmentMapper.java
@@ -38,6 +38,8 @@ import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 
+import reactor.core.publisher.Mono;
+
 public class InMemoryAttachmentMapper implements AttachmentMapper {
     
     private static final int INITIAL_SIZE = 128;
@@ -73,9 +75,11 @@ public class InMemoryAttachmentMapper implements AttachmentMapper {
     }
 
     @Override
-    public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
-        attachmentsById.put(attachment.getAttachmentId(), attachment);
-        ownersByAttachmentId.put(attachment.getAttachmentId(), owner);
+    public Mono<Void> storeAttachmentForOwner(Attachment attachment, Username owner) {
+        return Mono.fromRunnable(() -> {
+            attachmentsById.put(attachment.getAttachmentId(), attachment);
+            ownersByAttachmentId.put(attachment.getAttachmentId(), owner);
+        });
     }
 
     @Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index 2c59619..f32d60b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -34,6 +34,7 @@ import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,8 +75,8 @@ public class StoreAttachmentManager implements AttachmentManager {
     }
 
     @Override
-    public void storeAttachment(Attachment attachment, MailboxSession mailboxSession) throws MailboxException {
-        attachmentMapperFactory.getAttachmentMapper(mailboxSession)
+    public Publisher<Void> storeAttachment(Attachment attachment, MailboxSession mailboxSession) {
+        return attachmentMapperFactory.getAttachmentMapper(mailboxSession)
             .storeAttachmentForOwner(attachment, mailboxSession.getUser());
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 2736d37..f97e5f0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -465,6 +465,7 @@ public class StoreMessageManager implements MessageManager {
                     .addMetaData(message.metaData())
                     .build(),
                     new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                    .subscribeOn(Schedulers.elastic())
                     .block();
                 return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
             }, MailboxPathLocker.LockType.Write);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
index 8b411b3..ced1530 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
@@ -28,6 +28,7 @@ import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.transaction.Mapper;
+import org.reactivestreams.Publisher;
 
 public interface AttachmentMapper extends Mapper {
 
@@ -35,7 +36,7 @@ public interface AttachmentMapper extends Mapper {
 
     List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds);
 
-    void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException;
+    Publisher<Void> storeAttachmentForOwner(Attachment attachment, Username owner);
 
     void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId) throws MailboxException;
 
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
index 97469eb..ad21cf6 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/AttachmentMapperTest.java
@@ -37,6 +37,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public abstract class AttachmentMapperTest {
     private static final AttachmentId UNKNOWN_ATTACHMENT_ID = AttachmentId.from("unknown");
     private static final Username OWNER = Username.of("owner");
@@ -73,7 +75,7 @@ public abstract class AttachmentMapperTest {
                 .type("content")
                 .build();
         AttachmentId attachmentId = expected.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(expected, OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block();
         //When
         Attachment attachment = attachmentMapper.getAttachment(attachmentId);
         //Then
@@ -116,21 +118,21 @@ public abstract class AttachmentMapperTest {
     }
 
     @Test
-    void getAttachmentsShouldReturnTheAttachmentsWhenSome() throws Exception {
+    void getAttachmentsShouldReturnTheAttachmentsWhenSome() {
         //Given
         Attachment expected = Attachment.builder()
                 .bytes("payload".getBytes(StandardCharsets.UTF_8))
                 .type("content")
                 .build();
         AttachmentId attachmentId = expected.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(expected, OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(expected, OWNER)).block();
 
         Attachment expected2 = Attachment.builder()
                 .bytes("payload2".getBytes(StandardCharsets.UTF_8))
                 .type("content")
                 .build();
         AttachmentId attachmentId2 = expected2.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(expected2, OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(expected2, OWNER)).block();
 
         //When
         List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(attachmentId, attachmentId2));
@@ -153,7 +155,7 @@ public abstract class AttachmentMapperTest {
                 .type("content")
                 .build();
         AttachmentId attachmentId = attachment.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
         
         //When
         Collection<MessageId> messageIds = attachmentMapper.getRelatedMessageIds(attachmentId);
@@ -270,7 +272,7 @@ public abstract class AttachmentMapperTest {
             .build();
 
         AttachmentId attachmentId = attachment.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
 
         //When
         Collection<Username> expectedOwners = ImmutableList.of(OWNER);
@@ -305,8 +307,8 @@ public abstract class AttachmentMapperTest {
             .build();
 
         AttachmentId attachmentId = attachment.getAttachmentId();
-        attachmentMapper.storeAttachmentForOwner(attachment, OWNER);
-        attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER);
+        Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, OWNER)).block();
+        Mono.from(attachmentMapper.storeAttachmentForOwner(attachment, ADDITIONAL_OWNER)).block();
 
         //When
         Collection<Username> expectedOwners = ImmutableList.of(OWNER, ADDITIONAL_OWNER);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java
new file mode 100644
index 0000000..7589cbb
--- /dev/null
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/UploadRoutes.java
@@ -0,0 +1,160 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.jmap.http;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
+import static org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE_UTF8;
+import static org.apache.james.jmap.http.JMAPUrls.UPLOAD;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.james.jmap.JMAPRoutes;
+import org.apache.james.jmap.draft.exceptions.BadRequestException;
+import org.apache.james.jmap.draft.exceptions.InternalErrorException;
+import org.apache.james.jmap.draft.exceptions.UnauthorizedException;
+import org.apache.james.jmap.draft.model.UploadResponse;
+import org.apache.james.mailbox.AttachmentManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.model.Attachment;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.netty.http.server.HttpServerRequest;
+import reactor.netty.http.server.HttpServerResponse;
+import reactor.netty.http.server.HttpServerRoutes;
+
+public class UploadRoutes implements JMAPRoutes {
+    private static final Logger LOGGER = LoggerFactory.getLogger(UploadRoutes.class);
+
+    static class CancelledUploadException extends RuntimeException {
+
+    }
+
+    private final MetricFactory metricFactory;
+    private final AuthenticationReactiveFilter authenticationReactiveFilter;
+    private final AttachmentManager attachmentManager;
+    private final ObjectMapper objectMapper;
+
+    @Inject
+    private UploadRoutes(MetricFactory metricFactory, AuthenticationReactiveFilter authenticationReactiveFilter, AttachmentManager attachmentManager, ObjectMapper objectMapper) {
+        this.metricFactory = metricFactory;
+        this.authenticationReactiveFilter = authenticationReactiveFilter;
+        this.attachmentManager = attachmentManager;
+        this.objectMapper = objectMapper;
+    }
+
+    @Override
+    public Logger logger() {
+        return LOGGER;
+    }
+
+    @Override
+    public HttpServerRoutes define(HttpServerRoutes builder) {
+        return builder.post(UPLOAD, this::post)
+            .options(UPLOAD, CORS_CONTROL);
+    }
+
+    private Mono<Void> post(HttpServerRequest request, HttpServerResponse response)  {
+        String contentType = request.requestHeaders().get(CONTENT_TYPE);
+        if (Strings.isNullOrEmpty(contentType)) {
+            return response.status(BAD_REQUEST).send();
+        } else {
+            return authenticationReactiveFilter.authenticate(request)
+                .flatMap(session -> post(request, response, contentType, session))
+                .onErrorResume(CancelledUploadException.class, e -> handleCanceledUpload(response, e))
+                .onErrorResume(BadRequestException.class, e -> handleBadRequest(response, e))
+                .onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, e))
+                .onErrorResume(InternalErrorException.class, e -> handleInternalError(response, e))
+                .subscribeOn(Schedulers.elastic());
+        }
+    }
+
+    private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, String contentType, MailboxSession session) {
+        InputStream content = ReactorUtils.toInputStream(request.receive().asByteBuffer());
+        return Mono.from(metricFactory.runPublishingTimerMetric("JMAP-upload-post",
+            handle(contentType, content, session, response)));
+    }
+
+    private Mono<Void> handle(String contentType, InputStream content, MailboxSession mailboxSession, HttpServerResponse response) {
+        return uploadContent(contentType, content, mailboxSession)
+            .flatMap(storedContent -> {
+                try {
+                    return response.header(CONTENT_TYPE, JSON_CONTENT_TYPE_UTF8)
+                        .status(CREATED)
+                        .sendString(Mono.just(objectMapper.writeValueAsString(storedContent)))
+                        .then();
+                } catch (JsonProcessingException e) {
+                    throw new InternalErrorException("Error serializing upload response", e);
+                }
+            });
+    }
+
+    private Mono<UploadResponse> uploadContent(String contentType, InputStream inputStream, MailboxSession session) {
+        return toBytesArray(inputStream)
+            .map(bytes -> Attachment.builder()
+                .bytes(bytes)
+                .type(contentType)
+                .build())
+            .flatMap(attachment -> Mono.from(attachmentManager.storeAttachment(attachment, session))
+                .thenReturn(UploadResponse.builder()
+                    .blobId(attachment.getAttachmentId().getId())
+                    .type(attachment.getType())
+                    .size(attachment.getSize())
+                    .build()));
+    }
+
+    private Mono<byte[]> toBytesArray(InputStream inputStream) {
+        return Mono.fromCallable(() -> {
+            try {
+                return ByteStreams.toByteArray(inputStream);
+            } catch (IOException e) {
+                if (e instanceof EOFException) {
+                    throw new CancelledUploadException();
+                } else {
+                    throw new InternalErrorException("Error while uploading content", e);
+                }
+            }
+        });
+    }
+
+    private Mono<Void> handleCanceledUpload(HttpServerResponse response, CancelledUploadException e) {
+        LOGGER.info("An upload has been canceled before the end", e);
+        return response.send();
+    }
+
+    private Mono<Void> handleBadRequest(HttpServerResponse response, BadRequestException e) {
+        LOGGER.warn("Invalid authentication request received.", e);
+        return response.status(BAD_REQUEST).send();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org