You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/27 02:19:36 UTC

[james-project] branch master updated (5e5b4d2 -> 16bc091)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 5e5b4d2  [Refactoring] use new toArray method on Collection that takes a function
     new 411e330  JAMES-2997 MessageParser & MIMEMessageConverter should preserve charset
     new 0e8c980  JAMES-2997 MessageParser should preserve the full ContentType header
     new b9121b7  JAMES-3140: Implement CacheBlobStore
     new 3c48182  JAMES-3140: Implement CacheBlobStoreTest
     new 2be0d98  JAMES-3140: fix import order
     new 7baf7c4  JAMES-3140 Marking pushbackInputStream is not required
     new 3066696  JAMES-3140 Correct usage of push back input stream
     new 1964ee0  JAMES-3140: rebase master, change to CachedBlobStore, CachedBlobStoreTest
     new 7ed1ab1  JAMES-3140 Size buffer to the threshold
     new f39fe01  JAMES-3140 CachedBlobStore should rely on backend for defaultBucket
     new 86cb617  JAMES-3140: Implement CacheBlobStore
     new f1d4273  JAMES-3140: remove unessesary check BucketName in CachedBlobStore read
     new 71c17bf  JAMES-3140 DumbBlobStoreCacheContract EIGHT_KILOBYTES fix
     new 16bc091  JAMES-3140: remove +1 initiation size for pushbackInputStream

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/mail/model/impl/MessageParser.java       |   7 +-
 .../store/mail/model/impl/MessageParserTest.java   |  28 ++-
 .../resources/eml/{calendar.eml => charset.eml}    |   2 +-
 .../resources/eml/{calendar.eml => charset2.eml}   |  15 +-
 .../blob/api/BucketDumbBlobStoreContract.java      |   3 +-
 .../apache/james/blob/cassandra/BlobTables.java    |   2 +-
 .../james/blob/cassandra/CassandraBucketDAO.java   |   6 +-
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |   2 +-
 .../blob/cassandra/CassandraDumbBlobStore.java     |  10 +-
 ...DumbBlobStoreCache.java => BlobStoreCache.java} |   2 +-
 .../blob/cassandra/cache/CachedBlobStore.java      | 158 ++++++++++++++++
 ...heModule.java => CassandraBlobCacheModule.java} |  12 +-
 ...toreCache.java => CassandraBlobStoreCache.java} |  10 +-
 ...heContract.java => BlobStoreCacheContract.java} |  19 +-
 .../blob/cassandra/cache/CachedBlobStoreTest.java  | 201 +++++++++++++++++++++
 ...eTest.java => CassandraBlobStoreCacheTest.java} |  10 +-
 .../methods/integration/SetMessagesMethodTest.java |  96 ++++++++--
 .../test/resources/cucumber/GetMessages.feature    |  52 +++---
 .../jmap/draft/methods/MIMEMessageConverter.java   |  20 +-
 .../draft/methods/MIMEMessageConverterTest.java    |  48 +++++
 20 files changed, 612 insertions(+), 91 deletions(-)
 copy mailbox/store/src/test/resources/eml/{calendar.eml => charset.eml} (95%)
 copy mailbox/store/src/test/resources/eml/{calendar.eml => charset2.eml} (66%)
 rename server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/{DumbBlobStoreCache.java => BlobStoreCache.java} (97%)
 create mode 100644 server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
 rename server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/{CassandraDumbBlobCacheModule.java => CassandraBlobCacheModule.java} (83%)
 rename server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/{CassandraDumbBlobStoreCache.java => CassandraBlobStoreCache.java} (92%)
 rename server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/{DumbBlobStoreCacheContract.java => BlobStoreCacheContract.java} (87%)
 create mode 100644 server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
 rename server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/{CassandraDumbBlobStoreCacheTest.java => CassandraBlobStoreCacheTest.java} (88%)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/14: JAMES-3140 Size buffer to the threshold

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7ed1ab15e19a06491c2c74a8c766aac61fe20aef
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Apr 23 11:46:27 2020 +0700

    JAMES-3140 Size buffer to the threshold
    
    Array copy is still needed to resize the returned bytes array to the
    actual data size, avoiding padding with zeros
---
 .../apache/james/blob/cassandra/cache/CachedBlobStore.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 421386f..2639c64 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -136,11 +137,11 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
-        int sizeToRead = sizeThresholdInBytes + 1;
-        byte[] bytes = new byte[sizeToRead];
-        int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead);
+        byte[] bytes = new byte[sizeThresholdInBytes];
+        int readByteCount = IOUtils.read(pushbackInputStream, bytes);
+        int extraByte = pushbackInputStream.read();
         try {
-            if (readByteCount > sizeThresholdInBytes) {
+            if (extraByte >= 0) {
                 return Optional.empty();
             }
             if (readByteCount < 0) {
@@ -148,6 +149,9 @@ public class CachedBlobStore implements BlobStore {
             }
             return Optional.of(Arrays.copyOf(bytes, readByteCount));
         } finally {
+            if (extraByte >= 0) {
+                pushbackInputStream.unread(extraByte);
+            }
             if (readByteCount > 0) {
                 pushbackInputStream.unread(bytes, 0, readByteCount);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/14: JAMES-2997 MessageParser should preserve the full ContentType header

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0e8c9803e329861f9b0dcdebb1eb9806c2322c70
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 22 19:03:31 2020 +0700

    JAMES-2997 MessageParser should preserve the full ContentType header
---
 .../store/mail/model/impl/MessageParser.java       | 26 +----------
 .../store/mail/model/impl/MessageParserTest.java   | 10 +++--
 .../methods/integration/SetMessagesMethodTest.java |  4 +-
 .../test/resources/cucumber/GetMessages.feature    | 52 +++++++++++-----------
 4 files changed, 36 insertions(+), 56 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
index 53649fb..b8fc5ba 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
@@ -19,14 +19,11 @@
 
 package org.apache.james.mailbox.store.mail.model.impl;
 
-import static org.apache.james.mime4j.dom.field.ContentTypeField.PARAM_CHARSET;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Stream;
 
@@ -41,7 +38,6 @@ import org.apache.james.mime4j.dom.field.ContentDispositionField;
 import org.apache.james.mime4j.dom.field.ContentIdField;
 import org.apache.james.mime4j.dom.field.ContentTypeField;
 import org.apache.james.mime4j.dom.field.ParsedField;
-import org.apache.james.mime4j.field.Fields;
 import org.apache.james.mime4j.message.DefaultMessageBuilder;
 import org.apache.james.mime4j.message.DefaultMessageWriter;
 import org.apache.james.mime4j.stream.Field;
@@ -126,7 +122,8 @@ public class MessageParser {
     private ParsedAttachment retrieveAttachment(Entity entity) throws IOException {
         Optional<ContentTypeField> contentTypeField = getContentTypeField(entity);
         Optional<ContentDispositionField> contentDispositionField = getContentDispositionField(entity);
-        Optional<String> contentType = contentType(contentTypeField);
+        Optional<String> contentType = contentTypeField.map(ContentTypeField::getBody)
+            .filter(string -> !string.isEmpty());
         Optional<String> name = name(contentTypeField, contentDispositionField);
         Optional<Cid> cid = cid(readHeader(entity, CONTENT_ID, ContentIdField.class));
         boolean isInline = isInline(readHeader(entity, CONTENT_DISPOSITION, ContentDispositionField.class)) && cid.isPresent();
@@ -159,25 +156,6 @@ public class MessageParser {
         return Optional.of((U) field);
     }
 
-    private Optional<String> contentType(Optional<ContentTypeField> contentTypeField) {
-        return contentTypeField.map(this::contentTypePreserveCharset);
-    }
-
-    private String contentTypePreserveCharset(ContentTypeField contentTypeField) {
-        Map<String, String> params = contentTypeField.getParameters()
-            .entrySet()
-            .stream()
-            .filter(param -> param.getKey().equals(PARAM_CHARSET))
-            .collect(Guavate.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
-
-        try {
-            return Fields.contentType(contentTypeField.getMimeType(), params)
-                .getBody();
-        } catch (IllegalArgumentException e) {
-            return contentTypeField.getMimeType();
-        }
-    }
-
     private Optional<String> name(Optional<ContentTypeField> contentTypeField, Optional<ContentDispositionField> contentDispositionField) {
         return contentTypeField
             .map(field -> Optional.ofNullable(field.getParameter("name")))
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
index 8fe2009..3f29cee 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
@@ -106,7 +106,7 @@ class MessageParserTest {
         List<ParsedAttachment> attachments = testee.retrieveAttachments(ClassLoader.getSystemResourceAsStream("eml/oneAttachmentAndSomeTextInlined.eml"));
 
         assertThat(attachments).hasSize(1);
-        assertThat(attachments.get(0).getContentType()).isEqualTo("application/octet-stream");
+        assertThat(attachments.get(0).getContentType()).isEqualTo("application/octet-stream;\tname=\"exploits_of_a_mom.png\"");
     }
 
     @Test
@@ -270,7 +270,7 @@ class MessageParserTest {
 
         assertThat(attachments).hasSize(1)
             .first()
-            .satisfies(attachment -> assertThat(attachment.getContentType()).isEqualTo("text/calendar; charset=iso-8859-1"));
+            .satisfies(attachment -> assertThat(attachment.getContentType()).isEqualTo("text/calendar; charset=\"iso-8859-1\"; method=COUNTER"));
     }
 
     @Test
@@ -280,7 +280,8 @@ class MessageParserTest {
 
         assertThat(attachments).hasSize(2)
             .extracting(ParsedAttachment::getContentType)
-            .containsOnly("text/calendar; charset=iso-8859-1", "text/calendar; charset=iso-4444-5");
+            .containsOnly("text/calendar; charset=\"iso-8859-1\"; method=COUNTER",
+                "text/calendar; charset=\"iso-4444-5\"; method=COUNTER");
     }
 
     @Test
@@ -290,7 +291,8 @@ class MessageParserTest {
 
         assertThat(attachments)
             .hasSize(1)
-            .allMatch(messageAttachment -> messageAttachment.getContentType().equals("text/calendar; charset=utf-8"));
+            .extracting(ParsedAttachment::getContentType)
+            .containsExactly("text/calendar; charset=\"utf-8\"; method=COUNTER");
     }
 
     @Test
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
index c65bcda..f17efc6 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
@@ -5444,7 +5444,7 @@ public abstract class SetMessagesMethodTest {
             .body(ARGUMENTS + ".notCreated", aMapWithSize(0))
             .body(ARGUMENTS + ".created", aMapWithSize(1))
             .body(createdPath + ".attachments", hasSize(1))
-            .body(singleAttachment + ".type", equalTo("text/html; charset=UTF-8"))
+            .body(singleAttachment + ".type", equalTo("text/html; charset=UTF-8; name=\"=?US-ASCII?Q?nonIndexableAttachment.html?=\""))
             .body(singleAttachment + ".size", equalTo((int) uploadedAttachment.getSize()));
     }
 
@@ -5754,7 +5754,7 @@ public abstract class SetMessagesMethodTest {
             .body(NAME, equalTo("messages"))
             .body(ARGUMENTS + ".list", hasSize(1))
             .body(message + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo("text/calendar; charset=UTF-8"))
+            .body(firstAttachment + ".type", equalTo("text/calendar; method=REPLY; charset=UTF-8"))
             .body(firstAttachment + ".blobId", not(isEmptyOrNullString()));
     }
 
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
index d65a73e..273ae57 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
@@ -165,13 +165,13 @@ Feature: GetMessages method
     And the list of attachments of the message contains 2 attachments
     And the first attachment is:
       |key      | value                                                             |
-      |type     |"image/jpeg"                                                       |
+      |type     |"image/jpeg; name="4037_014.jpg""                                |
       |size     |846                                                                |
       |cid      |null                                                               |
       |isInline |false                                                              |
     And the second attachment is:
       |key      | value                                                             |
-      |type     |"image/jpeg"                                                       |
+      |type     |"image/jpeg; name="4037_015.jpg""                                |
       |size     |597                                                                |
       |cid      |"part1.37A15C92.A7C3488D@linagora.com"                             |
       |isInline |true                                                               |
@@ -274,10 +274,10 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-      |key      | value                                     |
-      |type     |"application/pdf"                          |
-      |cid      |null                                       |
-      |isInline |false                                      |
+      |key      | value                                                                  |
+      |type     |"application/pdf;	x-unix-mode=0644;	name="deromaCollection-628.pdf"" |
+      |cid      |null                                                                    |
+      |isInline |false                                                                   |
 
   Scenario: Retrieving message with inline attachment and blank CID should convert that inlined attachment to normal attachment
     Given "alice@domain.tld" has a message "m1" in "INBOX" mailbox with inline attachment and blank CID
@@ -287,10 +287,10 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-        |key      | value            |
-        |type     |"application/pdf" |
-        |cid      |null              |
-        |isInline |false             |
+        |key      | value                                                                    |
+        |type     |"application/pdf;	x-unix-mode=0644;	name="deromaCollection-628.pdf"" |
+        |cid      |null                                                                      |
+        |isInline |false                                                                     |
 
   Scenario: Preview should be computed even when HTML body contains many tags without content
     Given "alice@domain.tld" has a message "m1" in "INBOX" mailbox with HTML body with many empty tags
@@ -403,11 +403,11 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-      |key      | value                        |
-      |type     |"application/octet-stream"    |
-      |cid      |null                          |
-      |name     |"encrypted.asc"               |
-      |isInline |false                         |
+      |key      | value                                           |
+      |type     |"application/octet-stream; name="encrypted.asc"" |
+      |cid      |null                                             |
+      |name     |"encrypted.asc"                                  |
+      |isInline |false                                            |
 
   Scenario: Retrieving message should be possible when message with inlined image but without content disposition
     Given "alice@domain.tld" has a message "m1" in the "INBOX" mailbox with inlined image without content disposition
@@ -418,7 +418,7 @@ Feature: GetMessages method
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
       |key      | value                                           |
-      |type     |"image/png"                                      |
+      |type     |"image/png; name=vlc.png"                        |
       |cid      |"14672787885774e5c4d4cee471352039@linagora.com"  |
       |name     |"vlc.png"                                        |
       |isInline |false                                            |
@@ -431,11 +431,11 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-    |key      | value                        |
-    |type     |"image/jpeg"                  |
-    |cid      |null                          |
-    |name     |"IMG_6112.JPG"                |
-    |isInline |false                         |
+    |key      | value                                                                                  |
+    |type     |"image/jpeg;	name=IMG_6112.JPG;	x-apple-part-url=B11616AF-86EB-47AF-863A-176A823498DB" |
+    |cid      |null                                                                                    |
+    |name     |"IMG_6112.JPG"                                                                          |
+    |isInline |false                                                                                   |
 
   Scenario: Header only text calendar should be read as normal calendar attachment by JMAP
     Given "alice@domain.tld" receives a SMTP message specified in file "eml/ics_in_header.eml" as message "m1"
@@ -445,8 +445,8 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-    |key      | value                         |
-    |type     |"text/calendar; charset=UTF-8" |
-    |size     |1096                           |
-    |name     |"event.ics"                    |
-    |isInline |false                          |
+    |key      | value                                                       |
+    |type     |"text/calendar; method=REPLY; charset=UTF-8; name=event.ics" |
+    |size     |1096                                                         |
+    |name     |"event.ics"                                                  |
+    |isInline |false                                                        |


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/14: JAMES-3140 Correct usage of push back input stream

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3066696d675258f3970f85f15b36f64f6aefbd34
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 21 17:28:21 2020 +0700

    JAMES-3140 Correct usage of push back input stream
---
 .../james/blob/cassandra/cache/CacheBlobStore.java | 67 ++++++++++++----------
 .../cassandra/cache/BlobStoreCacheContract.java    | 12 ++++
 .../blob/cassandra/cache/CacheBlobStoreTest.java   | 42 +++++++++++++-
 3 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
index 227ddca..fdddc45 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -24,11 +24,12 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
+import java.util.Arrays;
+import java.util.Optional;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -67,8 +68,7 @@ public class CacheBlobStore implements BlobStore {
             .filter(defaultBucket::equals)
             .flatMap(ignored ->
                 Mono.from(cache.read(blobId))
-                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes)))
-            )
+                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
             .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
             .blockOptional()
             .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
@@ -97,18 +97,23 @@ public class CacheBlobStore implements BlobStore {
     public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
         Preconditions.checkNotNull(inputStream, "InputStream must not be null");
 
+        if (isAbleToCache(bucketName, storagePolicy)) {
+            return saveInCache(bucketName, inputStream, storagePolicy);
+        }
+
+        return backend.save(bucketName, inputStream, storagePolicy);
+    }
+
+    private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
         PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
-        return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
-            .flatMap(blobId ->
-                Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
-                    .flatMap(largeStream -> {
-                        if (!largeStream) {
-                            return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy))
-                                .thenReturn(blobId);
-                        }
-                        return Mono.just(blobId);
-                    })
-            );
+
+        return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
+            .flatMap(Mono::justOrEmpty)
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
+                .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
+                    .thenReturn(blobId)))
+            .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)));
     }
 
     @Override
@@ -130,23 +135,23 @@ public class CacheBlobStore implements BlobStore {
         return Mono.from(backend.deleteBucket(bucketName));
     }
 
-    private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
-        return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream))
-            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
-    }
-
-    private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException {
-        byte[] bytes = new byte[pushbackInputStream.available()];
-        int read = IOUtils.read(pushbackInputStream, bytes);
-        pushbackInputStream.unread(read);
-        return bytes;
-    }
-
-    private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException {
-        long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
-        pushbackInputStream.unread(Math.toIntExact(skip));
-        return skip >= sizeThresholdInBytes;
+    private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException {
+        int sizeToRead = sizeThresholdInBytes + 1;
+        byte[] bytes = new byte[sizeToRead];
+        int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead);
+        try {
+            if (readByteCount > sizeThresholdInBytes) {
+                return Optional.empty();
+            }
+            if (readByteCount < 0) {
+                return Optional.of(new byte[] {});
+            }
+            return Optional.of(Arrays.copyOf(bytes, readByteCount));
+        } finally {
+            if (readByteCount > 0) {
+                pushbackInputStream.unread(bytes, 0, readByteCount);
+            }
+        }
     }
 
     /**
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index 6819aca..8d22d3f 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.awaitility.Awaitility.await;
 
+import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
@@ -107,4 +108,15 @@ public interface BlobStoreCacheContract {
         await().atMost(Duration.TWO_SECONDS.plus(Duration.FIVE_HUNDRED_MILLISECONDS)).await().untilAsserted(()
             -> assertThat(Mono.from(testee().read(blobId)).blockOptional()).isEmpty());
     }
+
+    @Test
+    default void readShouldReturnEmptyCachedByteArray() {
+        BlobId blobId = blobIdFactory().randomId();
+        byte[] emptyByteArray = new byte[] {};
+
+        Mono.from(testee().cache(blobId, emptyByteArray)).block();
+
+        assertThat(new ByteArrayInputStream(Mono.from(testee().read(blobId)).block()))
+            .hasSameContentAs(new ByteArrayInputStream(emptyByteArray));
+    }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
index 1c5ff09..a7808ab 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -45,12 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.base.Strings;
+
 import reactor.core.publisher.Mono;
 
 public class CacheBlobStoreTest implements BlobStoreContract {
 
     private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
     private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+    byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8);
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
@@ -139,16 +143,50 @@ public class CacheBlobStoreTest implements BlobStoreContract {
     }
 
     @Test
-    public void shouldNotCacheWhenEmptyStream() {
+    public void shouldCacheWhenEmptyStream() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
-            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
             assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
         });
     }
 
     @Test
+    public void shouldNotCacheWhenEmptyByteArray() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY));
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesSteam() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesBytes() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()))
+                .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
     public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/14: JAMES-3140 Marking pushbackInputStream is not required

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7baf7c4bd9acd1187125c9e855a00520431dc28d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 21 16:49:17 2020 +0700

    JAMES-3140 Marking pushbackInputStream is not required
---
 .../java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
index 0acc95e..227ddca 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -144,7 +144,6 @@ public class CacheBlobStore implements BlobStore {
     }
 
     private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException {
-        pushbackInputStream.mark(0);
         long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
         pushbackInputStream.unread(Math.toIntExact(skip));
         return skip >= sizeThresholdInBytes;
@@ -154,7 +153,7 @@ public class CacheBlobStore implements BlobStore {
      * bytes: byte[] from PushbackInputStream.If PushbackInputStream is empty bytes.length == 1
      */
     private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
-        return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes && bytes.length > 1;
+        return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes;
     }
 
     private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 12/14: JAMES-3140: remove unessesary check BucketName in CachedBlobStore read

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f1d4273b8d59095ca725de9c5f9d6cfa23bf4125
Author: ducnv <du...@gmail.com>
AuthorDate: Fri Apr 24 14:55:03 2020 +0700

    JAMES-3140: remove unessesary check BucketName in CachedBlobStore read
---
 .../java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java     | 2 --
 1 file changed, 2 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index ad73648..51e8617 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -56,8 +56,6 @@ public class CachedBlobStore implements BlobStore {
 
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
-        Preconditions.checkNotNull(bucketName, "bucketName should not be null");
-
         return Mono.just(bucketName)
             .filter(backend.getDefaultBucketName()::equals)
             .flatMap(ignored -> Mono.from(cache.read(blobId))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/14: JAMES-2997 MessageParser & MIMEMessageConverter should preserve charset

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 411e330e8c7a84d8c46396e20acab6722fe96cd0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 19 18:50:42 2020 +0700

    JAMES-2997 MessageParser & MIMEMessageConverter should preserve charset
    
    Previous implementations where not preserving charset upon:
     - composing a message via JMAP
     - storing an attachment related to a mailbox
    
    Note that JMAP attachment upload was preserving charset.
    
    **master** is also affected by this issue but the problem was hidden as
    SetMessages was returning the expected attachment metadata and not the
    actual one.
---
 .../store/mail/model/impl/MessageParser.java       | 21 ++++-
 .../store/mail/model/impl/MessageParserTest.java   | 24 +++++-
 mailbox/store/src/test/resources/eml/charset.eml   | 39 +++++++++
 mailbox/store/src/test/resources/eml/charset2.eml  | 52 ++++++++++++
 .../methods/integration/SetMessagesMethodTest.java | 96 +++++++++++++++++++---
 .../test/resources/cucumber/GetMessages.feature    | 10 +--
 .../jmap/draft/methods/MIMEMessageConverter.java   | 20 ++---
 .../draft/methods/MIMEMessageConverterTest.java    | 48 +++++++++++
 8 files changed, 277 insertions(+), 33 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
index 33d3406..53649fb 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/MessageParser.java
@@ -19,11 +19,14 @@
 
 package org.apache.james.mailbox.store.mail.model.impl;
 
+import static org.apache.james.mime4j.dom.field.ContentTypeField.PARAM_CHARSET;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Stream;
 
@@ -38,6 +41,7 @@ import org.apache.james.mime4j.dom.field.ContentDispositionField;
 import org.apache.james.mime4j.dom.field.ContentIdField;
 import org.apache.james.mime4j.dom.field.ContentTypeField;
 import org.apache.james.mime4j.dom.field.ParsedField;
+import org.apache.james.mime4j.field.Fields;
 import org.apache.james.mime4j.message.DefaultMessageBuilder;
 import org.apache.james.mime4j.message.DefaultMessageWriter;
 import org.apache.james.mime4j.stream.Field;
@@ -156,7 +160,22 @@ public class MessageParser {
     }
 
     private Optional<String> contentType(Optional<ContentTypeField> contentTypeField) {
-        return contentTypeField.map(ContentTypeField::getMimeType);
+        return contentTypeField.map(this::contentTypePreserveCharset);
+    }
+
+    private String contentTypePreserveCharset(ContentTypeField contentTypeField) {
+        Map<String, String> params = contentTypeField.getParameters()
+            .entrySet()
+            .stream()
+            .filter(param -> param.getKey().equals(PARAM_CHARSET))
+            .collect(Guavate.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        try {
+            return Fields.contentType(contentTypeField.getMimeType(), params)
+                .getBody();
+        } catch (IllegalArgumentException e) {
+            return contentTypeField.getMimeType();
+        }
     }
 
     private Optional<String> name(Optional<ContentTypeField> contentTypeField, Optional<ContentDispositionField> contentDispositionField) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
index 721b8b7..8fe2009 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/impl/MessageParserTest.java
@@ -264,13 +264,33 @@ class MessageParserTest {
     }
 
     @Test
+    void getAttachmentsShouldRetrieveCharset() throws Exception {
+        List<ParsedAttachment> attachments = testee.retrieveAttachments(
+            ClassLoader.getSystemResourceAsStream("eml/charset.eml"));
+
+        assertThat(attachments).hasSize(1)
+            .first()
+            .satisfies(attachment -> assertThat(attachment.getContentType()).isEqualTo("text/calendar; charset=iso-8859-1"));
+    }
+
+    @Test
+    void getAttachmentsShouldRetrieveAllPartsCharset() throws Exception {
+        List<ParsedAttachment> attachments = testee.retrieveAttachments(
+            ClassLoader.getSystemResourceAsStream("eml/charset2.eml"));
+
+        assertThat(attachments).hasSize(2)
+            .extracting(ParsedAttachment::getContentType)
+            .containsOnly("text/calendar; charset=iso-8859-1", "text/calendar; charset=iso-4444-5");
+    }
+
+    @Test
     void getAttachmentsShouldConsiderICSAsAttachments() throws Exception {
         List<ParsedAttachment> attachments = testee.retrieveAttachments(
             ClassLoader.getSystemResourceAsStream("eml/calendar.eml"));
 
         assertThat(attachments)
             .hasSize(1)
-            .allMatch(messageAttachment -> messageAttachment.getContentType().equals("text/calendar"));
+            .allMatch(messageAttachment -> messageAttachment.getContentType().equals("text/calendar; charset=utf-8"));
     }
 
     @Test
@@ -306,6 +326,6 @@ class MessageParserTest {
 
         List<ParsedAttachment> result = testee.retrieveAttachments(new ByteArrayInputStream(DefaultMessageWriter.asBytes(message)));
         assertThat(result).hasSize(1)
-            .allMatch(attachment -> attachment.getContentType().equals(MDN.DISPOSITION_CONTENT_TYPE));
+            .allMatch(attachment -> attachment.getContentType().equals("message/disposition-notification; charset=UTF-8"));
     }
 }
diff --git a/mailbox/store/src/test/resources/eml/charset.eml b/mailbox/store/src/test/resources/eml/charset.eml
new file mode 100644
index 0000000..957b4f1
--- /dev/null
+++ b/mailbox/store/src/test/resources/eml/charset.eml
@@ -0,0 +1,39 @@
+From: Test 1 <te...@linagora.com>
+To: Test 2 <te...@linagora.com>
+Subject: New Time Proposed: New event from Test: lole
+Date: Tue, 13 Mar 2018 14:36:08 +0000
+Message-ID: <AM...@AM5P190MB0354.EURP190.PROD.OUTLOOK.COM>
+Accept-Language: en-US
+Content-Language: en-US
+Content-Type: multipart/alternative;
+	boundary="_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_"
+MIME-Version: 1.0
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/plain; charset="iso-8859-1"
+Content-Transfer-Encoding: quoted-printable
+
+Will be better for me, thx
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/html; charset="iso-8859-1"
+Content-Transfer-Encoding: quoted-printable
+
+<html>
+ <p>Will be better for me, thx</p>
+</html>
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/calendar; charset="iso-8859-1"; method=COUNTER
+Content-Transfer-Encoding: base64
+
+QkVHSU46VkNBTEVOREFSDQpNRVRIT0Q6Q09VTlRFUg0KUFJPRElEOk1pY3Jvc29mdCBFeGNoYW5n
+ZSBTZXJ2ZXIgMjAxMA0KVkVSU0lPTjoyLjANCkJFR0lOOlZUSU1FWk9ORQ0KVFpJRDpFdXJvcGUv
+QmVybGluDQpCRUdJTjpTVEFOREFSRA0KRFRTVEFSVDoxNjAxMDEwMVQwMzAwMDANClRaT0ZGU0VU
+RlJPTTorMDIwMA0KVFpPRkZTRVRUTzorMDEwMA0KUlJVTEU6RlJFUT1ZRUFSTFk7SU5URVJWQUw9
+MTtCWURBWT0tMVNVO0JZTU9OVEg9MTANCkVORDpTVEFOREFSRA0KQkVHSU46REFZTElHSFQNCkRU
+U0VRVUVOQ0U6MA0KRFRTVEFNUDoyMDE4MDMxM1QxNDM2MDdaDQpDT01NRU5UO0xBTkdVQUdFPWVu
+LVVTOldpbGwgYmUgYmV0dGVyIGZvciBtZVwsIHRoeFxuDQpFTkQ6VkVWRU5UDQpFTkQ6VkNBTEVO
+REFSDQo=
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_--
\ No newline at end of file
diff --git a/mailbox/store/src/test/resources/eml/charset2.eml b/mailbox/store/src/test/resources/eml/charset2.eml
new file mode 100644
index 0000000..7919d8c
--- /dev/null
+++ b/mailbox/store/src/test/resources/eml/charset2.eml
@@ -0,0 +1,52 @@
+From: Test 1 <te...@linagora.com>
+To: Test 2 <te...@linagora.com>
+Subject: New Time Proposed: New event from Test: lole
+Date: Tue, 13 Mar 2018 14:36:08 +0000
+Message-ID: <AM...@AM5P190MB0354.EURP190.PROD.OUTLOOK.COM>
+Accept-Language: en-US
+Content-Language: en-US
+Content-Type: multipart/alternative;
+	boundary="_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_"
+MIME-Version: 1.0
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/plain; charset="iso-8859-1"
+Content-Transfer-Encoding: quoted-printable
+
+Will be better for me, thx
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/html; charset="iso-8859-1"
+Content-Transfer-Encoding: quoted-printable
+
+<html>
+ <p>Will be better for me, thx</p>
+</html>
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/calendar; charset="iso-8859-1"; method=COUNTER
+Content-Transfer-Encoding: base64
+
+QkVHSU46VkNBTEVOREFSDQpNRVRIT0Q6Q09VTlRFUg0KUFJPRElEOk1pY3Jvc29mdCBFeGNoYW5n
+ZSBTZXJ2ZXIgMjAxMA0KVkVSU0lPTjoyLjANCkJFR0lOOlZUSU1FWk9ORQ0KVFpJRDpFdXJvcGUv
+QmVybGluDQpCRUdJTjpTVEFOREFSRA0KRFRTVEFSVDoxNjAxMDEwMVQwMzAwMDANClRaT0ZGU0VU
+RlJPTTorMDIwMA0KVFpPRkZTRVRUTzorMDEwMA0KUlJVTEU6RlJFUT1ZRUFSTFk7SU5URVJWQUw9
+MTtCWURBWT0tMVNVO0JZTU9OVEg9MTANCkVORDpTVEFOREFSRA0KQkVHSU46REFZTElHSFQNCkRU
+U0VRVUVOQ0U6MA0KRFRTVEFNUDoyMDE4MDMxM1QxNDM2MDdaDQpDT01NRU5UO0xBTkdVQUdFPWVu
+LVVTOldpbGwgYmUgYmV0dGVyIGZvciBtZVwsIHRoeFxuDQpFTkQ6VkVWRU5UDQpFTkQ6VkNBTEVO
+REFSDQo=
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_
+Content-Type: text/calendar; charset="iso-4444-5"; method=COUNTER
+Content-Transfer-Encoding: base64
+
+QkVHSU46VkNBTEVOREFSDQpNRVRIT0Q6Q09VTlRFUg0KUFJPRElEOk1pY3Jvc29mdCBFeGNoYW5n
+ZSBTZXJ2ZXIgMjAxMA0KVkVSU0lPTjoyLjANCkJFR0lOOlZUSU1FWk9ORQ0KVFpJRDpFdXJvcGUv
+QmVybGluDQpCRUdJTjpTVEFOREFSRA0KRFRTVEFSVDoxNjAxMDEwMVQwMzAwMDANClRaT0ZGU0VU
+RlJPTTorMDIwMA0KVFpPRkZTRVRUTzorMDEwMA0KUlJVTEU6RlJFUT1ZRUFSTFk7SU5URVJWQUw9
+MTtCWURBWT0tMVNVO0JZTU9OVEg9MTANCkVORDpTVEFOREFSRA0KQkVHSU46REFZTElHSFQNCkRU
+U0VRVUVOQ0U6MA0KRFRTVEFNUDoyMDE4MDMxM1QxNDM2MDdaDQpDT01NRU5UO0xBTkdVQUdFPWVu
+LVVTOldpbGwgYmUgYmV0dGVyIGZvciBtZVwsIHRoeFxuDQpFTkQ6VkVWRU5UDQpFTkQ6VkNBTEVO
+REFSDQo=
+
+--_000_AM5P190MB03542A58E344F2C68475A951AFD20AM5P190MB0354EURP_--
\ No newline at end of file
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
index bf18630..c65bcda 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesMethodTest.java
@@ -153,6 +153,7 @@ public abstract class SetMessagesMethodTest {
     private static final String NOT_UPDATED = ARGUMENTS + ".notUpdated";
     private static final int BIG_MESSAGE_SIZE = 20 * 1024 * 1024;
     public static final String OCTET_CONTENT_TYPE = "application/octet-stream";
+    public static final String OCTET_CONTENT_TYPE_UTF8 = "application/octet-stream; charset=UTF-8";
 
     private AccessToken bobAccessToken;
 
@@ -4016,12 +4017,81 @@ public abstract class SetMessagesMethodTest {
                 createdPath + ".attachments[0].inlinedWithCid", createdPath + ".attachments[1].inlinedWithCid")
             .inPath(createdPath + ".attachments")
             .isEqualTo("[{" +
-                "  \"type\":\"application/octet-stream\"," +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
                 "  \"size\":" + bytes1.length() + "," +
                 "  \"cid\":null," +
                 "  \"isInline\":false" +
                 "}, {" +
-                "  \"type\":\"application/octet-stream\"," +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
+                "  \"size\":" + bytes2.length() + "," +
+                "  \"cid\":\"123456789\"," +
+                "  \"isInline\":true" +
+                "}]");
+    }
+
+    @Test
+    public void setMessagesShouldPreserveCharsetOfAttachment() throws Exception {
+        String bytes1 = "attachment";
+        String bytes2 = "attachment2";
+        AttachmentMetadata uploadedAttachment1 = uploadAttachment(OCTET_CONTENT_TYPE_UTF8, bytes1.getBytes(StandardCharsets.UTF_8));
+        AttachmentMetadata uploadedAttachment2 = uploadAttachment(OCTET_CONTENT_TYPE_UTF8, bytes2.getBytes(StandardCharsets.UTF_8));
+
+        String messageCreationId = "creationId";
+        String fromAddress = USERNAME.asString();
+        String outboxId = getOutboxId(accessToken);
+        String requestBody = "[" +
+            "  [" +
+            "    \"setMessages\"," +
+            "    {" +
+            "      \"create\": { \"" + messageCreationId  + "\" : {" +
+            "        \"from\": { \"name\": \"Me\", \"email\": \"" + fromAddress + "\"}," +
+            "        \"to\": [{ \"name\": \"BOB\", \"email\": \"someone@example.com\"}]," +
+            "        \"subject\": \"Message with two attachments\"," +
+            "        \"textBody\": \"Test body\"," +
+            "        \"mailboxIds\": [\"" + outboxId + "\"], " +
+            "        \"attachments\": [" +
+            "               {\"blobId\" : \"" + uploadedAttachment1.getAttachmentId().getId() + "\", " +
+            "               \"type\" : \"" + uploadedAttachment1.getType() + "\", " +
+            "               \"size\" : " + uploadedAttachment1.getSize() + "}," +
+            "               {\"blobId\" : \"" + uploadedAttachment2.getAttachmentId().getId() + "\", " +
+            "               \"type\" : \"" + uploadedAttachment2.getType() + "\", " +
+            "               \"size\" : " + uploadedAttachment2.getSize() + ", " +
+            "               \"cid\" : \"123456789\", " +
+            "               \"isInline\" : true }" +
+            "           ]" +
+            "      }}" +
+            "    }," +
+            "    \"#0\"" +
+            "  ]" +
+            "]";
+
+        String createdPath = ARGUMENTS + ".created[\"" + messageCreationId + "\"]";
+
+        String json = given()
+            .header("Authorization", accessToken.asString())
+            .body(requestBody)
+        .when()
+            .post("/jmap")
+        .then()
+            .statusCode(200)
+            .body(NAME, equalTo("messagesSet"))
+            .body(ARGUMENTS + ".notCreated", aMapWithSize(0))
+            .body(ARGUMENTS + ".created", aMapWithSize(1))
+            .body(createdPath + ".attachments", hasSize(2))
+            .extract().asString();
+
+        assertThatJson(json)
+            .withOptions(new Options(Option.TREATING_NULL_AS_ABSENT, Option.IGNORING_ARRAY_ORDER, Option.IGNORING_EXTRA_FIELDS))
+            .whenIgnoringPaths(createdPath + ".attachments[0].blobId", createdPath + ".attachments[1].blobId",
+                createdPath + ".attachments[0].inlinedWithCid", createdPath + ".attachments[1].inlinedWithCid")
+            .inPath(createdPath + ".attachments")
+            .isEqualTo("[{" +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
+                "  \"size\":" + bytes1.length() + "," +
+                "  \"cid\":null," +
+                "  \"isInline\":false" +
+                "}, {" +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
                 "  \"size\":" + bytes2.length() + "," +
                 "  \"cid\":\"123456789\"," +
                 "  \"isInline\":true" +
@@ -4302,7 +4372,7 @@ public abstract class SetMessagesMethodTest {
             .body(NAME, equalTo("messages"))
             .body(ARGUMENTS + ".list", hasSize(1))
             .body(firstMessage + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo(OCTET_CONTENT_TYPE))
+            .body(firstAttachment + ".type", equalTo(OCTET_CONTENT_TYPE_UTF8))
             .body(firstAttachment + ".size", equalTo(rawBytes.length))
             .body(firstAttachment + ".cid", equalTo("123456789"))
             .body(firstAttachment + ".isInline", equalTo(true))
@@ -4375,7 +4445,7 @@ public abstract class SetMessagesMethodTest {
             .body(NAME, equalTo("messages"))
             .body(ARGUMENTS + ".list", hasSize(1))
             .body(firstMessage + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo(OCTET_CONTENT_TYPE))
+            .body(firstAttachment + ".type", equalTo(OCTET_CONTENT_TYPE_UTF8))
             .body(firstAttachment + ".size", equalTo(rawBytes.length))
             .body(firstAttachment + ".cid", equalTo("123456789"))
             .body(firstAttachment + ".isInline", equalTo(true))
@@ -4472,7 +4542,7 @@ public abstract class SetMessagesMethodTest {
             .body(firstMessage + ".textBody", equalTo("Test body, plain text version"))
             .body(firstMessage + ".htmlBody", equalTo("Test <b>body</b>, HTML version"))
             .body(firstMessage + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo("text/html"))
+            .body(firstAttachment + ".type", equalTo("text/html; charset=UTF-8"))
             .body(firstAttachment + ".size", equalTo(text.length()))
             .extract()
             .jsonPath()
@@ -4547,7 +4617,7 @@ public abstract class SetMessagesMethodTest {
             .body(firstMessage + ".textBody", equalTo("Test body, plain text version"))
             .body(firstMessage + ".htmlBody", isEmptyOrNullString())
             .body(firstMessage + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo("text/html"))
+            .body(firstAttachment + ".type", equalTo("text/html; charset=UTF-8"))
             .body(firstAttachment + ".size", equalTo((int) uploadedAttachment.getSize()))
             .extract()
             .jsonPath()
@@ -4632,7 +4702,7 @@ public abstract class SetMessagesMethodTest {
             .body(firstMessage + ".textBody", isEmptyOrNullString())
             .body(firstMessage + ".htmlBody", isEmptyOrNullString())
             .body(firstMessage + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo("text/plain"))
+            .body(firstAttachment + ".type", equalTo("text/plain; charset=UTF-8"))
             .body(firstAttachment + ".size", equalTo((int) uploadedAttachment.getSize()))
             .extract()
             .jsonPath()
@@ -5374,7 +5444,7 @@ public abstract class SetMessagesMethodTest {
             .body(ARGUMENTS + ".notCreated", aMapWithSize(0))
             .body(ARGUMENTS + ".created", aMapWithSize(1))
             .body(createdPath + ".attachments", hasSize(1))
-            .body(singleAttachment + ".type", equalTo("text/html"))
+            .body(singleAttachment + ".type", equalTo("text/html; charset=UTF-8"))
             .body(singleAttachment + ".size", equalTo((int) uploadedAttachment.getSize()));
     }
 
@@ -5493,9 +5563,9 @@ public abstract class SetMessagesMethodTest {
     @Test
     public void setMessagesShouldReturnAttachmentsWhenMessageHasInlinedAttachmentButNoCid() throws Exception {
         String bytes = "attachment";
-        AttachmentMetadata uploadedAttachment1 = uploadAttachment(OCTET_CONTENT_TYPE, bytes.getBytes(StandardCharsets.UTF_8));
+        AttachmentMetadata uploadedAttachment1 = uploadAttachment(OCTET_CONTENT_TYPE_UTF8, bytes.getBytes(StandardCharsets.UTF_8));
         String bytes2 = "attachment2";
-        AttachmentMetadata uploadedAttachment2 = uploadAttachment(OCTET_CONTENT_TYPE, bytes2.getBytes(StandardCharsets.UTF_8));
+        AttachmentMetadata uploadedAttachment2 = uploadAttachment(OCTET_CONTENT_TYPE_UTF8, bytes2.getBytes(StandardCharsets.UTF_8));
 
         String messageCreationId = "creationId";
         String fromAddress = USERNAME.asString();
@@ -5545,11 +5615,11 @@ public abstract class SetMessagesMethodTest {
             .withOptions(new Options(Option.TREATING_NULL_AS_ABSENT, Option.IGNORING_ARRAY_ORDER, Option.IGNORING_EXTRA_FIELDS))
             .inPath(createdPath + ".attachments")
             .isEqualTo("[{" +
-                "  \"type\":\"application/octet-stream\"," +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
                 "  \"size\":" + bytes2.length() + "," +
                 "  \"isInline\":false" +
                 "}, {" +
-                "  \"type\":\"application/octet-stream\"," +
+                "  \"type\":\"application/octet-stream; charset=UTF-8\"," +
                 "  \"size\":" + bytes.length() + "," +
                 "  \"isInline\":false" + // See JAMES-2258 inline should be false in case of no Content-ID for inlined attachment
                 // Stored attachment will not be considered as having an inlined attachment.
@@ -5684,7 +5754,7 @@ public abstract class SetMessagesMethodTest {
             .body(NAME, equalTo("messages"))
             .body(ARGUMENTS + ".list", hasSize(1))
             .body(message + ".attachments", hasSize(1))
-            .body(firstAttachment + ".type", equalTo("text/calendar"))
+            .body(firstAttachment + ".type", equalTo("text/calendar; charset=UTF-8"))
             .body(firstAttachment + ".blobId", not(isEmptyOrNullString()));
     }
 
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
index d58c6a1..d65a73e 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/resources/cucumber/GetMessages.feature
@@ -445,8 +445,8 @@ Feature: GetMessages method
     And the hasAttachment of the message is "true"
     And the list of attachments of the message contains 1 attachments
     And the first attachment is:
-    |key      | value                        |
-    |type     |"text/calendar"               |
-    |size     |1096                          |
-    |name     |"event.ics"                   |
-    |isInline |false                         |
+    |key      | value                         |
+    |type     |"text/calendar; charset=UTF-8" |
+    |size     |1096                           |
+    |name     |"event.ics"                    |
+    |isInline |false                          |
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MIMEMessageConverter.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MIMEMessageConverter.java
index d5690f8..4864f05 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MIMEMessageConverter.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/MIMEMessageConverter.java
@@ -69,7 +69,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.io.ByteStreams;
 import com.google.common.net.MediaType;
 
@@ -329,25 +328,22 @@ public class MIMEMessageConverter {
     }
 
     private ContentTypeField contentTypeField(MessageAttachmentMetadata att) {
-        Builder<String, String> parameters = ImmutableMap.builder();
-        if (att.getName().isPresent()) {
-            parameters.put("name", encode(att.getName().get()));
-        }
         String type = att.getAttachment().getType();
-        if (type.contains(FIELD_PARAMETERS_SEPARATOR)) {
-            return Fields.contentType(contentTypeWithoutParameters(type), parameters.build());
+        ContentTypeField typeAsField = Fields.contentType(type);
+        if (att.getName().isPresent()) {
+            return Fields.contentType(typeAsField.getMimeType(),
+                ImmutableMap.<String, String>builder()
+                    .putAll(typeAsField.getParameters())
+                    .put("name", encode(att.getName().get()))
+                    .build());
         }
-        return Fields.contentType(type, parameters.build());
+        return typeAsField;
     }
 
     private String encode(String name) {
         return EncoderUtil.encodeEncodedWord(name, Usage.TEXT_TOKEN);
     }
 
-    private String contentTypeWithoutParameters(String type) {
-        return Splitter.on(FIELD_PARAMETERS_SEPARATOR).splitToList(type).get(0);
-    }
-
     private ContentDispositionField contentDispositionField(boolean isInline) {
         if (isInline) {
             return Fields.contentDisposition("inline");
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/MIMEMessageConverterTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/MIMEMessageConverterTest.java
index efca590..6cff89c 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/MIMEMessageConverterTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/MIMEMessageConverterTest.java
@@ -646,6 +646,54 @@ class MIMEMessageConverterTest {
         }
 
         @Test
+        void convertToMimeShouldPreservePartCharset() throws Exception {
+            // Given
+            MIMEMessageConverter sut = new MIMEMessageConverter(attachmentContentLoader);
+
+            CreationMessage testMessage = CreationMessage.builder()
+                .mailboxId("dead-bada55")
+                .subject("subject")
+                .from(DraftEmailer.builder().name("sender").build())
+                .htmlBody("Hello <b>all<b>!")
+                .build();
+
+            String expectedCID = "cid";
+            String expectedMimeType = "text/calendar; charset=\"iso-8859-1\"";
+            String text = "123456";
+            TextBody expectedBody = new BasicBodyFactory().textBody(text.getBytes(), StandardCharsets.UTF_8);
+            AttachmentId blodId = AttachmentId.from("blodId");
+            MessageAttachmentMetadata attachment = MessageAttachmentMetadata.builder()
+                .attachment(AttachmentMetadata.builder()
+                    .attachmentId(blodId)
+                    .size(text.getBytes().length)
+                    .type(expectedMimeType)
+                    .build())
+                .cid(Cid.from(expectedCID))
+                .isInline(true)
+                .build();
+            when(attachmentContentLoader.load(attachment.getAttachment(), session))
+                .thenReturn(new ByteArrayInputStream(text.getBytes()));
+
+            // When
+            Message result = sut.convertToMime(new ValueWithId.CreationMessageEntry(
+                CreationMessageId.of("user|mailbox|1"), testMessage), ImmutableList.of(attachment), session);
+            Multipart typedResult = (Multipart)result.getBody();
+
+            assertThat(typedResult.getBodyParts())
+                .hasSize(1)
+                .extracting(entity -> (Multipart) entity.getBody())
+                .flatExtracting(Multipart::getBodyParts)
+                .anySatisfy(part -> {
+                    assertThat(part.getBody()).isEqualToComparingOnlyGivenFields(expectedBody, "content");
+                    assertThat(part.getDispositionType()).isEqualTo("inline");
+                    assertThat(part.getMimeType()).isEqualTo("text/calendar");
+                    assertThat(part.getCharset()).isEqualTo("iso-8859-1");
+                    assertThat(part.getHeader().getField("Content-ID").getBody()).isEqualTo(expectedCID);
+                    assertThat(part.getContentTransferEncoding()).isEqualTo("base64");
+                });
+        }
+
+        @Test
         void convertToMimeShouldAddAttachmentAndMultipartAlternativeWhenOneAttachementAndTextAndHtmlBody() throws Exception {
             // Given
             MIMEMessageConverter sut = new MIMEMessageConverter(attachmentContentLoader);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 14/14: JAMES-3140: remove +1 initiation size for pushbackInputStream

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 16bc09187fac5ce8d6e216e95fbd74a881228d43
Author: ducnv <du...@gmail.com>
AuthorDate: Fri Apr 24 17:26:25 2020 +0700

    JAMES-3140: remove +1 initiation size for pushbackInputStream
---
 .../java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 51e8617..3d4abad 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -96,7 +96,7 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
-        PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
+        PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes);
 
         return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream))
             .flatMap(Mono::justOrEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/14: JAMES-3140: rebase master, change to CachedBlobStore, CachedBlobStoreTest

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1964ee0de01aa2fdf15e0213555ca9e786466c31
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 21 17:55:27 2020 +0700

    JAMES-3140: rebase master, change to CachedBlobStore, CachedBlobStoreTest
---
 .../cassandra/cache/{CacheBlobStore.java => CachedBlobStore.java} | 8 ++++----
 .../cache/{CacheBlobStoreTest.java => CachedBlobStoreTest.java}   | 4 ++--
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
similarity index 96%
rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index fdddc45..421386f 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -41,7 +41,7 @@ import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Mono;
 
-public class CacheBlobStore implements BlobStore {
+public class CachedBlobStore implements BlobStore {
 
     private static final String DEFAULT_BUCKET = "cassandraDefault";
 
@@ -51,9 +51,9 @@ public class CacheBlobStore implements BlobStore {
     private final BucketName defaultBucket;
 
     @Inject
-    public CacheBlobStore(BlobStoreCache cache, BlobStore backend,
-                          CassandraCacheConfiguration cacheConfiguration,
-                          @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+    public CachedBlobStore(BlobStoreCache cache, BlobStore backend,
+                           CassandraCacheConfiguration cacheConfiguration,
+                           @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
         this.cache = cache;
         this.backend = backend;
         this.sizeThresholdInBytes = cacheConfiguration.getSizeThresholdInBytes();
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
similarity index 98%
rename from server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
rename to server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index a7808ab..1ca51df 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -50,7 +50,7 @@ import com.google.common.base.Strings;
 
 import reactor.core.publisher.Mono;
 
-public class CacheBlobStoreTest implements BlobStoreContract {
+public class CachedBlobStoreTest implements BlobStoreContract {
 
     private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
     private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
@@ -71,7 +71,7 @@ public class CacheBlobStoreTest implements BlobStoreContract {
             .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
             .build();
         cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
-        testee = new CacheBlobStore(cache, backend, cacheConfig, DEFAULT);
+        testee = new CachedBlobStore(cache, backend, cacheConfig, DEFAULT);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/14: JAMES-3140: fix import order

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2be0d98be23da17d24421f243a69172951fe16cd
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 21 09:27:30 2020 +0700

    JAMES-3140: fix import order
---
 .../java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java    | 2 --
 .../org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java  | 2 +-
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 240a514..b6b7ace 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -21,7 +21,6 @@ package org.apache.james.blob.cassandra;
 
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -41,7 +40,6 @@ import org.apache.james.util.ReactorUtils;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
index ce3237e..fe6217e 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
@@ -27,10 +27,10 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
-import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID;
 import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA;
 import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME;
 import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TTL_FOR_ROW;
+import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID;
 
 import java.nio.ByteBuffer;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 13/14: JAMES-3140 DumbBlobStoreCacheContract EIGHT_KILOBYTES fix

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 71c17bf2745fd9d60f6d30ed94836be9b3453673
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 24 16:03:58 2020 +0700

    JAMES-3140 DumbBlobStoreCacheContract EIGHT_KILOBYTES fix
---
 .../org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index 8d22d3f..c151ef3 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -36,7 +36,7 @@ import reactor.core.publisher.Mono;
 
 public interface BlobStoreCacheContract {
 
-    byte[] EIGHT_KILOBYTES = Strings.repeat("01234567\n", 1024).getBytes(StandardCharsets.UTF_8);
+    byte[] EIGHT_KILOBYTES = Strings.repeat("0123456\n", 1024).getBytes(StandardCharsets.UTF_8);
 
     BlobStoreCache testee();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 10/14: JAMES-3140 CachedBlobStore should rely on backend for defaultBucket

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f39fe01acc113112d5327c7150fd1b48be2db9d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Apr 23 11:48:39 2020 +0700

    JAMES-3140 CachedBlobStore should rely on backend for defaultBucket
---
 .../james/blob/cassandra/cache/CachedBlobStore.java   | 19 ++++++-------------
 .../blob/cassandra/cache/CachedBlobStoreTest.java     |  2 +-
 2 files changed, 7 insertions(+), 14 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 2639c64..d8fc908 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.Optional;
 
 import javax.inject.Inject;
-import javax.inject.Named;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
@@ -43,22 +42,16 @@ import com.google.common.base.Preconditions;
 import reactor.core.publisher.Mono;
 
 public class CachedBlobStore implements BlobStore {
-
-    private static final String DEFAULT_BUCKET = "cassandraDefault";
-
     private final BlobStoreCache cache;
     private final BlobStore backend;
     private final Integer sizeThresholdInBytes;
-    private final BucketName defaultBucket;
 
     @Inject
     public CachedBlobStore(BlobStoreCache cache, BlobStore backend,
-                           CassandraCacheConfiguration cacheConfiguration,
-                           @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+                           CassandraCacheConfiguration cacheConfiguration) {
         this.cache = cache;
         this.backend = backend;
         this.sizeThresholdInBytes = cacheConfiguration.getSizeThresholdInBytes();
-        this.defaultBucket = defaultBucket;
     }
 
     @Override
@@ -66,7 +59,7 @@ public class CachedBlobStore implements BlobStore {
         Preconditions.checkNotNull(bucketName, "bucketName should not be null");
 
         return Mono.just(bucketName)
-            .filter(defaultBucket::equals)
+            .filter(backend.getDefaultBucketName()::equals)
             .flatMap(ignored ->
                 Mono.from(cache.read(blobId))
                     .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
@@ -78,7 +71,7 @@ public class CachedBlobStore implements BlobStore {
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return Mono.just(bucketName)
-            .filter(defaultBucket::equals)
+            .filter(backend.getDefaultBucketName()::equals)
             .flatMap(ignored -> Mono.from(cache.read(blobId)))
             .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
     }
@@ -119,14 +112,14 @@ public class CachedBlobStore implements BlobStore {
 
     @Override
     public BucketName getDefaultBucketName() {
-        return defaultBucket;
+        return backend.getDefaultBucketName();
     }
 
     @Override
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
         return Mono.from(backend.delete(bucketName, blobId))
             .then(Mono.just(bucketName)
-                .filter(defaultBucket::equals)
+                .filter(backend.getDefaultBucketName()::equals)
                 .flatMap(ignored -> Mono.from(cache.remove(blobId)))
                 .then());
     }
@@ -166,6 +159,6 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
-        return defaultBucket.equals(bucketName) && !storagePolicy.equals(LOW_COST);
+        return backend.getDefaultBucketName().equals(bucketName) && !storagePolicy.equals(LOW_COST);
     }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index 1ca51df..debea77 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -71,7 +71,7 @@ public class CachedBlobStoreTest implements BlobStoreContract {
             .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
             .build();
         cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
-        testee = new CachedBlobStore(cache, backend, cacheConfig, DEFAULT);
+        testee = new CachedBlobStore(cache, backend, cacheConfig);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/14: JAMES-3140: Implement CacheBlobStore

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b9121b72fde6e203af8b515ba30ae429aed51ba4
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 14 17:38:33 2020 +0700

    JAMES-3140: Implement CacheBlobStore
---
 .../blob/api/BucketDumbBlobStoreContract.java      |  9 +--
 .../james/blob/cassandra/CassandraBlobModule.java  | 14 ++++
 .../james/blob/cassandra/CassandraBucketDAO.java   |  6 +-
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  2 +-
 .../blob/cassandra/CassandraDumbBlobStore.java     |  2 +-
 .../blob/cassandra/cache/CachedDumbBlobStore.java  | 80 ++++++++++++++++++++++
 6 files changed, 104 insertions(+), 9 deletions(-)

diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index d560587..54d2384 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -20,6 +20,7 @@
 package org.apache.james.blob.api;
 
 import static org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES;
 import static org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID;
 import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY;
 import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_STRING;
@@ -53,7 +54,7 @@ public interface BucketDumbBlobStoreContract {
     default void deleteBucketShouldDeleteExistingBucketWithItsData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
         Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
 
         assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
@@ -117,16 +118,16 @@ public interface BucketDumbBlobStoreContract {
     default void readStreamShouldThrowWhenBucketDoesNotExist() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
         assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
 
     @Test
-    default void readBytesShouldThrowWhenBucketDoesNotExist() {
+    default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
 
         assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)).block())
             .isInstanceOf(ObjectNotFoundException.class);
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
index 1975c1a..934812d 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
 import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -28,6 +29,7 @@ import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts;
 import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable;
 
 import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
 public interface CassandraBlobModule {
     CassandraModule MODULE = CassandraModule
@@ -65,5 +67,17 @@ public interface CassandraBlobModule {
             .addPartitionKey(BucketBlobParts.ID, DataType.text())
             .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataType.cint()))
 
+        .table(BlobTables.DumbBlobCache.TABLE_NAME)
+        .options(options -> options
+            .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
+                .compactionWindowSize(1)
+                .compactionWindowUnit(HOURS))
+            .readRepairChance(0.0))
+        .comment("Write through cache for small blobs stored in a slower blob store implementation which is object storage" +
+            "Messages` headers and bodies are stored as blobparts.")
+        .statement(statement -> statement
+            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
+            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
+
         .build();
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index f6d124f..0287f37 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraBucketDAO {
+public class CassandraBucketDAO {
     private final BlobId.Factory blobIdFactory;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
@@ -59,7 +59,7 @@ class CassandraBucketDAO {
 
     @Inject
     @VisibleForTesting
-    CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
+    public CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
         this.blobIdFactory = blobIdFactory;
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insert = prepareInsert(session);
@@ -168,7 +168,7 @@ class CassandraBucketDAO {
                 .setString(BucketBlobParts.ID, blobId.asString()));
     }
 
-    Flux<Pair<BucketName, BlobId>> listAll() {
+    public Flux<Pair<BucketName, BlobId>> listAll() {
         return cassandraAsyncExecutor.executeRows(listAll.bind())
             .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
     }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index d564066..af99a4b 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -53,7 +53,7 @@ public class CassandraDefaultBucketDAO {
 
     @Inject
     @VisibleForTesting
-    CassandraDefaultBucketDAO(Session session) {
+    public CassandraDefaultBucketDAO(Session session) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insert = prepareInsert(session);
         this.select = prepareSelect(session);
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 94e2bde..2e57325 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -56,7 +56,7 @@ public class CassandraDumbBlobStore implements DumbBlobStore {
     private final BucketName defaultBucket;
 
     @Inject
-    CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
+    public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
                            CassandraBucketDAO bucketDAO,
                            CassandraConfiguration cassandraConfiguration,
                            @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
new file mode 100644
index 0000000..53f3df2
--- /dev/null
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
@@ -0,0 +1,80 @@
+package org.apache.james.blob.cassandra.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public class CachedDumbBlobStore implements DumbBlobStore {
+
+    private final DumbBlobStoreCache cache;
+    private final DumbBlobStore backend;
+
+    @Inject
+    public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore backend) {
+        this.cache = cache;
+        this.backend = backend;
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
+        Preconditions.checkNotNull(bucketName, "bucketName should not be null");
+
+        return Mono.from(cache.read(blobId))
+            .map(bytes -> (InputStream) new ByteArrayInputStream(bytes))
+            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
+            .blockOptional()
+            .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
+    }
+
+    @Override
+    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return Mono.from(cache.read(blobId))
+            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+        return Mono.from(cache.cache(blobId, data))
+            .then(Mono.from(backend.save(bucketName, blobId, data)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
+        return Mono.fromCallable(() -> inputStream)
+            .map(stream -> cache.cache(blobId, stream))
+            .then(Mono.from(backend.save(bucketName, blobId, inputStream)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
+        return Mono.from(backend.save(bucketName, blobId, content))
+            .then(Mono.using(content::openBufferedStream,
+                inputStream -> Mono.from(cache.cache(blobId, inputStream)),
+                Throwing.consumer(InputStream::close).sneakyThrow()));
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+        return Mono.from(backend.delete(bucketName, blobId))
+            .then(Mono.from(cache.remove(blobId)));
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return Mono.from(backend.deleteBucket(bucketName));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/14: JAMES-3140: Implement CacheBlobStoreTest

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3c4818289ca8777528284ad9470322538b4f2e95
Author: ducnv <du...@gmail.com>
AuthorDate: Tue Apr 14 17:38:58 2020 +0700

    JAMES-3140: Implement CacheBlobStoreTest
---
 .../blob/api/BucketDumbBlobStoreContract.java      |   6 +-
 .../apache/james/blob/cassandra/BlobTables.java    |   2 +-
 .../james/blob/cassandra/CassandraBlobModule.java  |  14 --
 .../blob/cassandra/CassandraDumbBlobStore.java     |  10 +-
 ...DumbBlobStoreCache.java => BlobStoreCache.java} |   2 +-
 .../james/blob/cassandra/cache/CacheBlobStore.java | 163 +++++++++++++++++++++
 .../blob/cassandra/cache/CachedDumbBlobStore.java  |  80 ----------
 ...heModule.java => CassandraBlobCacheModule.java} |  12 +-
 ...toreCache.java => CassandraBlobStoreCache.java} |  10 +-
 ...heContract.java => BlobStoreCacheContract.java} |   5 +-
 .../blob/cassandra/cache/CacheBlobStoreTest.java   | 163 +++++++++++++++++++++
 ...eTest.java => CassandraBlobStoreCacheTest.java} |  10 +-
 12 files changed, 357 insertions(+), 120 deletions(-)

diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index 54d2384..8695118 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -54,7 +54,7 @@ public interface BucketDumbBlobStoreContract {
     default void deleteBucketShouldDeleteExistingBucketWithItsData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
         Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
 
         assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
@@ -118,7 +118,7 @@ public interface BucketDumbBlobStoreContract {
     default void readStreamShouldThrowWhenBucketDoesNotExist() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
         assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
@@ -127,7 +127,7 @@ public interface BucketDumbBlobStoreContract {
     default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
 
         assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)).block())
             .isInstanceOf(ObjectNotFoundException.class);
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
index 92e6756..09a6823 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
@@ -49,7 +49,7 @@ public interface BlobTables {
         String DATA = "data";
     }
 
-    interface DumbBlobCache {
+    interface BlobStoreCache {
         String TABLE_NAME = "blob_cache";
         String ID = "id";
         String DATA = "data";
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
index 934812d..1975c1a 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.blob.cassandra;
 
-import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
 import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -29,7 +28,6 @@ import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts;
 import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable;
 
 import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
 public interface CassandraBlobModule {
     CassandraModule MODULE = CassandraModule
@@ -67,17 +65,5 @@ public interface CassandraBlobModule {
             .addPartitionKey(BucketBlobParts.ID, DataType.text())
             .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataType.cint()))
 
-        .table(BlobTables.DumbBlobCache.TABLE_NAME)
-        .options(options -> options
-            .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
-                .compactionWindowSize(1)
-                .compactionWindowUnit(HOURS))
-            .readRepairChance(0.0))
-        .comment("Write through cache for small blobs stored in a slower blob store implementation which is object storage" +
-            "Messages` headers and bodies are stored as blobparts.")
-        .statement(statement -> statement
-            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
-            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
-
         .build();
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 2e57325..240a514 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -21,6 +21,7 @@ package org.apache.james.blob.cassandra;
 
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -38,7 +39,9 @@ import org.apache.james.blob.cassandra.utils.DataChunker;
 import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
@@ -56,10 +59,11 @@ public class CassandraDumbBlobStore implements DumbBlobStore {
     private final BucketName defaultBucket;
 
     @Inject
+    @VisibleForTesting
     public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
-                           CassandraBucketDAO bucketDAO,
-                           CassandraConfiguration cassandraConfiguration,
-                           @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+                                  CassandraBucketDAO bucketDAO,
+                                  CassandraConfiguration cassandraConfiguration,
+                                  @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
similarity index 97%
rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java
rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
index de60a33..9aec0b0 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
@@ -21,7 +21,7 @@ package org.apache.james.blob.cassandra.cache;
 import org.apache.james.blob.api.BlobId;
 import org.reactivestreams.Publisher;
 
-public interface DumbBlobStoreCache {
+public interface BlobStoreCache {
     Publisher<Void> cache(BlobId blobId, byte[] data);
 
     Publisher<byte[]> read(BlobId blobId);
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
new file mode 100644
index 0000000..0acc95e
--- /dev/null
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.blob.cassandra.cache;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.reactivestreams.Publisher;
+
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Mono;
+
+public class CacheBlobStore implements BlobStore {
+
+    private static final String DEFAULT_BUCKET = "cassandraDefault";
+
+    private final BlobStoreCache cache;
+    private final BlobStore backend;
+    private final Integer sizeThresholdInBytes;
+    private final BucketName defaultBucket;
+
+    @Inject
+    public CacheBlobStore(BlobStoreCache cache, BlobStore backend,
+                          CassandraCacheConfiguration cacheConfiguration,
+                          @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+        this.cache = cache;
+        this.backend = backend;
+        this.sizeThresholdInBytes = cacheConfiguration.getSizeThresholdInBytes();
+        this.defaultBucket = defaultBucket;
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
+        Preconditions.checkNotNull(bucketName, "bucketName should not be null");
+
+        return Mono.just(bucketName)
+            .filter(defaultBucket::equals)
+            .flatMap(ignored ->
+                Mono.from(cache.read(blobId))
+                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes)))
+            )
+            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
+            .blockOptional()
+            .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
+    }
+
+    @Override
+    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return Mono.just(bucketName)
+            .filter(defaultBucket::equals)
+            .flatMap(ignored -> Mono.from(cache.read(blobId)))
+            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+    }
+
+    @Override
+    public Mono<BlobId> save(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
+        return Mono.from(backend.save(bucketName, bytes, storagePolicy))
+            .flatMap(blobId -> {
+                if (isAbleToCache(bucketName, bytes, storagePolicy)) {
+                    return Mono.from(cache.cache(blobId, bytes)).thenReturn(blobId);
+                }
+                return Mono.just(blobId);
+            });
+    }
+
+    @Override
+    public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) {
+        Preconditions.checkNotNull(inputStream, "InputStream must not be null");
+
+        PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
+        return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))
+            .flatMap(blobId ->
+                Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
+                    .flatMap(largeStream -> {
+                        if (!largeStream) {
+                            return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy))
+                                .thenReturn(blobId);
+                        }
+                        return Mono.just(blobId);
+                    })
+            );
+    }
+
+    @Override
+    public BucketName getDefaultBucketName() {
+        return defaultBucket;
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        return Mono.from(backend.delete(bucketName, blobId))
+            .then(Mono.just(bucketName)
+                .filter(defaultBucket::equals)
+                .flatMap(ignored -> Mono.from(cache.remove(blobId)))
+                .then());
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return Mono.from(backend.deleteBucket(bucketName));
+    }
+
+    private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+        return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream))
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+    }
+
+    private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException {
+        byte[] bytes = new byte[pushbackInputStream.available()];
+        int read = IOUtils.read(pushbackInputStream, bytes);
+        pushbackInputStream.unread(read);
+        return bytes;
+    }
+
+    private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException {
+        pushbackInputStream.mark(0);
+        long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
+        pushbackInputStream.unread(Math.toIntExact(skip));
+        return skip >= sizeThresholdInBytes;
+    }
+
+    /**
+     * bytes: byte[] from PushbackInputStream.If PushbackInputStream is empty bytes.length == 1
+     */
+    private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
+        return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes && bytes.length > 1;
+    }
+
+    private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) {
+        return defaultBucket.equals(bucketName) && !storagePolicy.equals(LOW_COST);
+    }
+}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
deleted file mode 100644
index 53f3df2..0000000
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.james.blob.cassandra.cache;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-import javax.inject.Inject;
-
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.BucketName;
-import org.apache.james.blob.api.DumbBlobStore;
-import org.apache.james.blob.api.ObjectNotFoundException;
-import org.apache.james.blob.api.ObjectStoreIOException;
-import org.reactivestreams.Publisher;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteSource;
-
-import reactor.core.publisher.Mono;
-
-public class CachedDumbBlobStore implements DumbBlobStore {
-
-    private final DumbBlobStoreCache cache;
-    private final DumbBlobStore backend;
-
-    @Inject
-    public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore backend) {
-        this.cache = cache;
-        this.backend = backend;
-    }
-
-    @Override
-    public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
-        Preconditions.checkNotNull(bucketName, "bucketName should not be null");
-
-        return Mono.from(cache.read(blobId))
-            .map(bytes -> (InputStream) new ByteArrayInputStream(bytes))
-            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
-            .blockOptional()
-            .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)));
-    }
-
-    @Override
-    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return Mono.from(cache.read(blobId))
-            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
-        return Mono.from(cache.cache(blobId, data))
-            .then(Mono.from(backend.save(bucketName, blobId, data)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
-        return Mono.fromCallable(() -> inputStream)
-            .map(stream -> cache.cache(blobId, stream))
-            .then(Mono.from(backend.save(bucketName, blobId, inputStream)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
-        return Mono.from(backend.save(bucketName, blobId, content))
-            .then(Mono.using(content::openBufferedStream,
-                inputStream -> Mono.from(cache.cache(blobId, inputStream)),
-                Throwing.consumer(InputStream::close).sneakyThrow()));
-    }
-
-    @Override
-    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
-        return Mono.from(backend.delete(bucketName, blobId))
-            .then(Mono.from(cache.remove(blobId)));
-    }
-
-    @Override
-    public Publisher<Void> deleteBucket(BucketName bucketName) {
-        return Mono.from(backend.deleteBucket(bucketName));
-    }
-}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
similarity index 83%
rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java
rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
index 6d4734e..3f8fa12 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
@@ -20,20 +20,22 @@
 package org.apache.james.blob.cassandra.cache;
 
 import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.ID;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.blob.cassandra.BlobTables;
 
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
-public interface CassandraDumbBlobCacheModule {
+public interface CassandraBlobCacheModule {
 
     double NO_READ_REPAIR = 0d;
 
     CassandraModule MODULE = CassandraModule
         .builder()
-        .table(BlobTables.DumbBlobCache.TABLE_NAME)
+        .table(TABLE_NAME)
         .options(options -> options
             .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
                 .compactionWindowSize(1)
@@ -41,7 +43,7 @@ public interface CassandraDumbBlobCacheModule {
             .readRepairChance(NO_READ_REPAIR))
         .comment("Write through cache for small blobs stored in a slower blob store implementation.")
         .statement(statement -> statement
-            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
-            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
+            .addPartitionKey(ID, DataType.text())
+            .addColumn(DATA, DataType.blob()))
         .build();
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
similarity index 92%
rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java
rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
index 74bd73b..ce3237e 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
@@ -28,9 +28,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
 import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID;
-import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.DATA;
-import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TABLE_NAME;
-import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TTL_FOR_ROW;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TTL_FOR_ROW;
 
 import java.nio.ByteBuffer;
 
@@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraDumbBlobStoreCache implements DumbBlobStoreCache {
+public class CassandraBlobStoreCache implements BlobStoreCache {
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insertStatement;
@@ -58,7 +58,7 @@ public class CassandraDumbBlobStoreCache implements DumbBlobStoreCache {
 
     @Inject
     @VisibleForTesting
-    CassandraDumbBlobStoreCache(Session session, CassandraCacheConfiguration cacheConfiguration) {
+    CassandraBlobStoreCache(Session session, CassandraCacheConfiguration cacheConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insertStatement = prepareInsert(session);
         this.selectStatement = prepareSelect(session);
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
similarity index 98%
rename from server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java
rename to server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index e4101f6..6819aca 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -33,11 +33,11 @@ import com.google.common.base.Strings;
 
 import reactor.core.publisher.Mono;
 
-public interface DumbBlobStoreCacheContract {
+public interface BlobStoreCacheContract {
 
     byte[] EIGHT_KILOBYTES = Strings.repeat("01234567\n", 1024).getBytes(StandardCharsets.UTF_8);
 
-    DumbBlobStoreCache testee();
+    BlobStoreCache testee();
 
     BlobId.Factory blobIdFactory();
 
@@ -64,7 +64,6 @@ public interface DumbBlobStoreCacheContract {
     default void shouldReturnEmptyWhenReadWithTimeOut() {
         BlobId blobId = blobIdFactory().randomId();
         Mono.from(testee().cache(blobId, EIGHT_KILOBYTES)).block();
-
     }
 
     @Test
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
new file mode 100644
index 0000000..1c5ff09
--- /dev/null
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.blob.cassandra.cache;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.HIGH_PERFORMANCE;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+import static org.apache.james.blob.api.BucketName.DEFAULT;
+import static org.apache.james.blob.cassandra.cache.BlobStoreCacheContract.EIGHT_KILOBYTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobStore;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Mono;
+
+public class CacheBlobStoreTest implements BlobStoreContract {
+
+    private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
+    private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+        CassandraModule.aggregateModules(CassandraBlobModule.MODULE, CassandraBlobCacheModule.MODULE));
+
+    private BlobStore testee;
+    private BlobStore backend;
+    private BlobStoreCache cache;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        backend = CassandraBlobStore.forTesting(cassandra.getConf());
+        CassandraCacheConfiguration cacheConfig = new CassandraCacheConfiguration.Builder()
+            .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
+            .build();
+        cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
+        testee = new CacheBlobStore(cache, backend, cacheConfig, DEFAULT);
+    }
+
+    @Override
+    public BlobStore testee() {
+        return testee;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return new HashBlobId.Factory();
+    }
+
+    @Test
+    public void shouldCacheWhenDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        byte[] actual = Mono.from(cache.read(blobId)).block();
+        assertThat(actual).containsExactly(EIGHT_KILOBYTES);
+    }
+
+    @Test
+    public void shouldNotCacheWhenNotDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(TEST_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(TEST_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, TWELVE_MEGABYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES);
+        });
+    }
+
+    @Test
+    public void shouldSavedBothInCacheAndBackendWhenSizeBase() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldSavedBothInCacheAndBackendWhenHighPerformance() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenLowCost() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, LOW_COST)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenEmptyStream() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+        });
+    }
+
+    @Test
+    public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKERNAME, blobId))::block)
+                .doesNotThrowAnyException();
+            assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())
+                .isInstanceOf(ObjectNotFoundException.class);
+        });
+    }
+}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
similarity index 88%
rename from server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java
rename to server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
index 5b88799..efdc705 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
@@ -27,16 +27,16 @@ import org.apache.james.blob.api.HashBlobId;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-public class CassandraDumbBlobStoreCacheTest implements DumbBlobStoreCacheContract {
+public class CassandraBlobStoreCacheTest implements BlobStoreCacheContract {
 
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraDumbBlobCacheModule.MODULE);
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobCacheModule.MODULE);
 
     private final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(50);
     private final int DEFAULT_THRESHOLD_IN_BYTES = EIGHT_KILOBYTES.length;
     private final Duration _2_SEC_TTL = Duration.ofSeconds(2);
 
-    private DumbBlobStoreCache testee;
+    private BlobStoreCache testee;
     private HashBlobId.Factory blobIdFactory;
 
     @BeforeEach
@@ -47,11 +47,11 @@ public class CassandraDumbBlobStoreCacheTest implements DumbBlobStoreCacheContra
             .timeOut(DEFAULT_READ_TIMEOUT)
             .ttl(_2_SEC_TTL)
             .build();
-        testee = new CassandraDumbBlobStoreCache(cassandra.getConf(), cacheConfiguration);
+        testee = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfiguration);
     }
 
     @Override
-    public DumbBlobStoreCache testee() {
+    public BlobStoreCache testee() {
         return testee;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 11/14: JAMES-3140: Implement CacheBlobStore

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 86cb617b3e7eb0d3daa069b80bebab724fa0457e
Author: ducnv <du...@gmail.com>
AuthorDate: Fri Apr 24 13:51:16 2020 +0700

    JAMES-3140: Implement CacheBlobStore
---
 .../java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index d8fc908..ad73648 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -60,8 +60,7 @@ public class CachedBlobStore implements BlobStore {
 
         return Mono.just(bucketName)
             .filter(backend.getDefaultBucketName()::equals)
-            .flatMap(ignored ->
-                Mono.from(cache.read(blobId))
+            .flatMap(ignored -> Mono.from(cache.read(blobId))
                     .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))))
             .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId)))
             .blockOptional()
@@ -151,9 +150,6 @@ public class CachedBlobStore implements BlobStore {
         }
     }
 
-    /**
-     * bytes: byte[] from PushbackInputStream.If PushbackInputStream is empty bytes.length == 1
-     */
     private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) {
         return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org