You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/26 01:34:45 UTC

[james-project] branch master updated: JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new dfe62ea345 JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)
dfe62ea345 is described below

commit dfe62ea345458c0fc23cb42be6cfcda66caaa115
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri Aug 26 08:34:40 2022 +0700

    JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)
    
    This consume double memory and can easily be avoided with a sequenced
    input stream. This move will help reducing the memory pressure when
    reading messages.
---
 .../mailbox/model/HeaderAndBodyByteContent.java    | 50 ++++++++++++++++++++++
 .../cassandra/mail/CassandraMessageDAOV3.java      | 37 ++++++++--------
 2 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
new file mode 100644
index 0000000000..b929fc26f2
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.model;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
+public final class HeaderAndBodyByteContent implements Content {
+
+    private final byte[] headers;
+    private final byte[] body;
+
+    private final long size;
+
+    public HeaderAndBodyByteContent(byte[] headers, byte[] body) {
+        this.headers = headers;
+        this.body = body;
+        size = (long) headers.length + (long) body.length;
+    }
+
+    @Override
+    public long size() {
+        return size;
+    }
+
+    @Override
+    public InputStream getInputStream() {
+        return new SequenceInputStream(
+            new ByteArrayInputStream(headers),
+            new ByteArrayInputStream(body));
+    }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 1f876757ee..05d9cccedc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -83,6 +83,8 @@ import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.ByteContent;
 import org.apache.james.mailbox.model.Cid;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
 import org.apache.james.mailbox.model.MessageAttachmentMetadata;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
@@ -101,7 +103,6 @@ import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
 import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteSource;
-import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -344,17 +345,18 @@ public class CassandraMessageDAOV3 {
         BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
         int bodyStartOctet = row.getInt(BODY_START_OCTET);
 
-        return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet).map(content ->
-            new MessageRepresentation(
-                cassandraMessageId,
-                Date.from(row.getInstant(INTERNAL_DATE_LOWERCASE)),
-                row.getLong(FULL_CONTENT_OCTETS_LOWERCASE),
-                row.getInt(BODY_START_OCTET_LOWERCASE),
-                new ByteContent(content),
-                getProperties(row),
-                getAttachments(row).collect(ImmutableList.toImmutableList()),
-                headerId,
-                bodyId));
+        return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet)
+            .map(content ->
+                new MessageRepresentation(
+                    cassandraMessageId,
+                    Date.from(row.getInstant(INTERNAL_DATE_LOWERCASE)),
+                    row.getLong(FULL_CONTENT_OCTETS_LOWERCASE),
+                    row.getInt(BODY_START_OCTET_LOWERCASE),
+                    content,
+                    getProperties(row),
+                    getAttachments(row).collect(ImmutableList.toImmutableList()),
+                    headerId,
+                    bodyId));
     }
 
     private Properties getProperties(Row row) {
@@ -398,22 +400,23 @@ public class CassandraMessageDAOV3 {
             .setUuid(MESSAGE_ID, messageId.get()));
     }
 
-    private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
+    private Mono<Content> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
         switch (fetchType) {
             case FULL:
                 return getFullContent(headerId, bodyId);
             case HEADERS:
-                return getContent(headerId, SIZE_BASED);
+                return getContent(headerId, SIZE_BASED)
+                    .map(ByteContent::new);
             case METADATA:
-                return Mono.just(EMPTY_BYTE_ARRAY);
+                return Mono.just(new ByteContent(EMPTY_BYTE_ARRAY));
             default:
                 throw new RuntimeException("Unknown FetchType " + fetchType);
         }
     }
 
-    private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
+    private Mono<Content> getFullContent(BlobId headerId, BlobId bodyId) {
         return getContent(headerId, SIZE_BASED)
-            .zipWith(getContent(bodyId, LOW_COST), Bytes::concat);
+            .zipWith(getContent(bodyId, LOW_COST), HeaderAndBodyByteContent::new);
     }
 
     private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org