You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/26 01:34:45 UTC
[james-project] branch master updated: JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new dfe62ea345 JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)
dfe62ea345 is described below
commit dfe62ea345458c0fc23cb42be6cfcda66caaa115
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri Aug 26 08:34:40 2022 +0700
JAMES-3793 Prevent Bytes concat upon reading FULL messages (#1152)
This consume double memory and can easily be avoided with a sequenced
input stream. This move will help reducing the memory pressure when
reading messages.
---
.../mailbox/model/HeaderAndBodyByteContent.java | 50 ++++++++++++++++++++++
.../cassandra/mail/CassandraMessageDAOV3.java | 37 ++++++++--------
2 files changed, 70 insertions(+), 17 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
new file mode 100644
index 0000000000..b929fc26f2
--- /dev/null
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/HeaderAndBodyByteContent.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.model;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
+public final class HeaderAndBodyByteContent implements Content {
+
+ private final byte[] headers;
+ private final byte[] body;
+
+ private final long size;
+
+ public HeaderAndBodyByteContent(byte[] headers, byte[] body) {
+ this.headers = headers;
+ this.body = body;
+ size = (long) headers.length + (long) body.length;
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return new SequenceInputStream(
+ new ByteArrayInputStream(headers),
+ new ByteArrayInputStream(body));
+ }
+}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 1f876757ee..05d9cccedc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -83,6 +83,8 @@ import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.ByteContent;
import org.apache.james.mailbox.model.Cid;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
import org.apache.james.mailbox.model.MessageAttachmentMetadata;
import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
@@ -101,7 +103,6 @@ import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
-import com.google.common.primitives.Bytes;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -344,17 +345,18 @@ public class CassandraMessageDAOV3 {
BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
int bodyStartOctet = row.getInt(BODY_START_OCTET);
- return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet).map(content ->
- new MessageRepresentation(
- cassandraMessageId,
- Date.from(row.getInstant(INTERNAL_DATE_LOWERCASE)),
- row.getLong(FULL_CONTENT_OCTETS_LOWERCASE),
- row.getInt(BODY_START_OCTET_LOWERCASE),
- new ByteContent(content),
- getProperties(row),
- getAttachments(row).collect(ImmutableList.toImmutableList()),
- headerId,
- bodyId));
+ return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet)
+ .map(content ->
+ new MessageRepresentation(
+ cassandraMessageId,
+ Date.from(row.getInstant(INTERNAL_DATE_LOWERCASE)),
+ row.getLong(FULL_CONTENT_OCTETS_LOWERCASE),
+ row.getInt(BODY_START_OCTET_LOWERCASE),
+ content,
+ getProperties(row),
+ getAttachments(row).collect(ImmutableList.toImmutableList()),
+ headerId,
+ bodyId));
}
private Properties getProperties(Row row) {
@@ -398,22 +400,23 @@ public class CassandraMessageDAOV3 {
.setUuid(MESSAGE_ID, messageId.get()));
}
- private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
+ private Mono<Content> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
switch (fetchType) {
case FULL:
return getFullContent(headerId, bodyId);
case HEADERS:
- return getContent(headerId, SIZE_BASED);
+ return getContent(headerId, SIZE_BASED)
+ .map(ByteContent::new);
case METADATA:
- return Mono.just(EMPTY_BYTE_ARRAY);
+ return Mono.just(new ByteContent(EMPTY_BYTE_ARRAY));
default:
throw new RuntimeException("Unknown FetchType " + fetchType);
}
}
- private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
+ private Mono<Content> getFullContent(BlobId headerId, BlobId bodyId) {
return getContent(headerId, SIZE_BASED)
- .zipWith(getContent(bodyId, LOW_COST), Bytes::concat);
+ .zipWith(getContent(bodyId, LOW_COST), HeaderAndBodyByteContent::new);
}
private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org