You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2019/07/04 09:29:47 UTC

[james-project] branch master updated (78813a8 -> 2548451)

This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 78813a8  JAMES-2771 Fix dirty provisionning for CI benchmarks
     new 862dd15  Avoid blob copy when saving byte arrays
     new ed91ea3  Position data length when possible
     new 30dc0b9  Allow Store.Impl to cary along a byte[] representation
     new 088123b  fixup! Avoid blob copy when saving byte arrays
     new 8591618  Merge remote-tracking branch 'btellier/improve-blob-store-efficiency-v4'
     new 6c20dff  JAMES-2816 Avoid copying the whole message when we only need to copy metadata
     new 55257ba  JAMES-2816 Use buffered streams to handle mails
     new 398594d  JAMES-2816 Fix Intelij warnings in SimpleMailboxMessage
     new 02fb91a  JAMES-2816 Refactor appendMessage method to make it more readable
     new 031775a  Merge remote-tracking branch 'mbaechler/optimize-append-message'
     new 87136c3  JAMES-2817 Rename subscribeWorkQueue to consumeWorkQueue and subscribe in start()
     new 37b0b39  JAMES-2817 Avoid C-like cast syntax in GroupRegistration
     new d3c5a61  JAMES-2817 WaitDelayGenerator jitter could remove time from next iteration
     new 2338e64  JAMES-2817 GroupRegistration deliver doesn't need to use subscribeWith
     new 2548451  Merge remote-tracking branch 'mbaechler/some-small-eventBus-fix'

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mailbox/events/ErrorHandlingContract.java      |   4 +-
 .../james/mailbox/events/GroupRegistration.java    |  25 +-
 .../james/mailbox/events/WaitDelayGenerator.java   |  24 +-
 .../mailbox/events/WaitDelayGeneratorTest.java     |   8 +-
 .../memory/src/test/resources/test.eml             |   0
 .../spamassassin/SpamAssassinListenerTest.java     |   4 +-
 .../james/mailbox/store/StoreMessageIdManager.java |   2 +-
 .../james/mailbox/store/StoreMessageManager.java   | 290 ++++++++++++---------
 .../james/mailbox/store/event/EventFactory.java    |   4 -
 .../mail/model/impl/SimpleMailboxMessage.java      |  10 +-
 .../mailbox/store/MessageIdManagerTestSystem.java  |   2 +-
 .../main/java/org/apache/james/blob/api/Store.java |  24 +-
 .../james/blob/objectstorage/AESPayloadCodec.java  |   5 +
 .../blob/objectstorage/DefaultPayloadCodec.java    |   9 +
 .../blob/objectstorage/ObjectStorageBlobsDAO.java  |  24 +-
 .../james/blob/objectstorage/PayloadCodec.java     |   2 +
 .../apache/james/blob/mail/MimeMessageStore.java   |   6 +-
 17 files changed, 259 insertions(+), 184 deletions(-)
 copy server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/service/CassandraMigrationService.java => mailbox/memory/src/test/resources/test.eml (100%)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/15: JAMES-2816 Refactor appendMessage method to make it more readable

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 02fb91af65229e1ccd4f714c55682b7abe4673f5
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jul 1 17:44:22 2019 +0200

    JAMES-2816 Refactor appendMessage method to make it more readable
---
 .../james/mailbox/store/StoreMessageManager.java   | 276 ++++++++++++---------
 1 file changed, 160 insertions(+), 116 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 5015d67..3cf9e06 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -133,6 +133,16 @@ public class StoreMessageManager implements MessageManager {
      */
     protected static final Flags MINIMAL_PERMANET_FLAGS;
 
+    private static class MediaType {
+        final String mediaType;
+        final String subType;
+
+        private MediaType(String mediaType, String subType) {
+            this.mediaType = mediaType;
+            this.subType = subType;
+        }
+    }
+
     static {
         MINIMAL_PERMANET_FLAGS = new Flags();
         MINIMAL_PERMANET_FLAGS.add(Flags.Flag.ANSWERED);
@@ -327,7 +337,6 @@ public class StoreMessageManager implements MessageManager {
 
     @Override
     public ComposedMessageId appendMessage(InputStream msgIn, Date internalDate, final MailboxSession mailboxSession, boolean isRecent, Flags flagsToBeSet) throws MailboxException {
-
         File file = null;
 
         if (!isWriteable(mailboxSession)) {
@@ -346,126 +355,20 @@ public class StoreMessageManager implements MessageManager {
                 // Disable line length... This should be handled by the smtp server
                 // component and not the parser itself
                 // https://issues.apache.org/jira/browse/IMAP-122
-
-                final MimeTokenStream parser = new MimeTokenStream(MimeConfig.PERMISSIVE, new DefaultBodyDescriptorBuilder());
-
-                parser.setRecursionMode(RecursionMode.M_NO_RECURSE);
-                parser.parse(bIn);
-                final HeaderImpl header = new HeaderImpl();
-
-                EntityState next = parser.next();
-                while (next != EntityState.T_BODY && next != EntityState.T_END_OF_STREAM && next != EntityState.T_START_MULTIPART) {
-                    if (next == EntityState.T_FIELD) {
-                        header.addField(parser.getField());
-                    }
-                    next = parser.next();
-                }
+                final MimeTokenStream parser = getParser(bIn);
+                final HeaderImpl header = readHeader(parser);
                 final MaximalBodyDescriptor descriptor = (MaximalBodyDescriptor) parser.getBodyDescriptor();
-                final PropertyBuilder propertyBuilder = new PropertyBuilder();
-                final String mediaType;
-                final String mediaTypeFromHeader = descriptor.getMediaType();
-                final String subType;
-                if (mediaTypeFromHeader == null) {
-                    mediaType = "text";
-                    subType = "plain";
-                } else {
-                    mediaType = mediaTypeFromHeader;
-                    subType = descriptor.getSubType();
-                }
-                propertyBuilder.setMediaType(mediaType);
-                propertyBuilder.setSubType(subType);
-                propertyBuilder.setContentID(descriptor.getContentId());
-                propertyBuilder.setContentDescription(descriptor.getContentDescription());
-                propertyBuilder.setContentLocation(descriptor.getContentLocation());
-                propertyBuilder.setContentMD5(descriptor.getContentMD5Raw());
-                propertyBuilder.setContentTransferEncoding(descriptor.getTransferEncoding());
-                propertyBuilder.setContentLanguage(descriptor.getContentLanguage());
-                propertyBuilder.setContentDispositionType(descriptor.getContentDispositionType());
-                propertyBuilder.setContentDispositionParameters(descriptor.getContentDispositionParameters());
-                propertyBuilder.setContentTypeParameters(descriptor.getContentTypeParameters());
-                // Add missing types
-                final String codeset = descriptor.getCharset();
-                if (codeset == null) {
-                    if ("TEXT".equalsIgnoreCase(mediaType)) {
-                        propertyBuilder.setCharset("us-ascii");
-                    }
-                } else {
-                    propertyBuilder.setCharset(codeset);
-                }
-
-                final String boundary = descriptor.getBoundary();
-                if (boundary != null) {
-                    propertyBuilder.setBoundary(boundary);
-                }
-                if ("text".equalsIgnoreCase(mediaType)) {
-                    final CountingInputStream bodyStream = new CountingInputStream(parser.getInputStream());
-                    bodyStream.readAll();
-                    long lines = bodyStream.getLineCount();
-                    bodyStream.close();
-                    next = parser.next();
-                    if (next == EntityState.T_EPILOGUE) {
-                        final CountingInputStream epilogueStream = new CountingInputStream(parser.getInputStream());
-                        epilogueStream.readAll();
-                        lines += epilogueStream.getLineCount();
-                        epilogueStream.close();
-
-                    }
-                    propertyBuilder.setTextualLineCount(lines);
-                }
-
-                final Flags flags;
-                if (flagsToBeSet == null) {
-                    flags = new Flags();
-                } else {
-                    flags = flagsToBeSet;
+                final MediaType mediaType = getMediaType(descriptor);
+                final PropertyBuilder propertyBuilder = getPropertyBuilder(descriptor, mediaType.mediaType, mediaType.subType);
+                setTextualLinesCount(parser, mediaType.mediaType, propertyBuilder);
+                final Flags flags = getFlags(mailboxSession, isRecent, flagsToBeSet);
 
-                    // Check if we need to trim the flags
-                    trimFlags(flags, mailboxSession);
-
-                }
-                if (isRecent) {
-                    flags.add(Flags.Flag.RECENT);
-                }
                 if (internalDate == null) {
                     internalDate = new Date();
                 }
-                byte[] discard = new byte[4096];
-                while (tmpMsgIn.read(discard) != -1) {
-                    // consume the rest of the stream so everything get copied to
-                    // the file now
-                    // via the TeeInputStream
-                }
-                bufferedOut.close();
-                int bodyStartOctet = (int) bIn.getBodyStartOffset();
-                if (bodyStartOctet == -1) {
-                    bodyStartOctet = 0;
-                }
-                try (SharedFileInputStream contentIn = new SharedFileInputStream(file)) {
-                    final int size = (int) file.length();
-
-                    final List<MessageAttachment> attachments = extractAttachments(contentIn);
-                    propertyBuilder.setHasAttachment(hasNonInlinedAttachment(attachments));
-
-                    final MailboxMessage message = createMessage(internalDate, size, bodyStartOctet, contentIn, flags, propertyBuilder, attachments);
-
-                    new QuotaChecker(quotaManager, quotaRootResolver, mailbox).tryAddition(1, size);
-
-                    return locker.executeWithLock(mailboxSession, getMailboxPath(), () -> {
-                        MessageMetaData data = appendMessageToStore(message, attachments, mailboxSession);
-
-                        Mailbox mailbox = getMailboxEntity();
-
-                        eventBus.dispatch(EventFactory.added()
-                            .randomEventId()
-                            .mailboxSession(mailboxSession)
-                            .mailbox(mailbox)
-                            .addMetaData(message.metaData())
-                            .build(),
-                            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
-                            .block();
-                        return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
-                    }, true);
-                }
+                consumeStream(bufferedOut, tmpMsgIn);
+                int bodyStartOctet = getBodyStartOctet(bIn);
+                return createAndDispatchMessage(internalDate, mailboxSession, file, propertyBuilder, flags, bodyStartOctet);
             }
         } catch (IOException | MimeException e) {
             throw new MailboxException("Unable to parse message", e);
@@ -479,7 +382,148 @@ public class StoreMessageManager implements MessageManager {
                 }
             }
         }
+    }
+
+    private MimeTokenStream getParser(BodyOffsetInputStream bIn) {
+        final MimeTokenStream parser = new MimeTokenStream(MimeConfig.PERMISSIVE, new DefaultBodyDescriptorBuilder());
+
+        parser.setRecursionMode(RecursionMode.M_NO_RECURSE);
+        parser.parse(bIn);
+        return parser;
+    }
+
+    private MediaType getMediaType(MaximalBodyDescriptor descriptor) {
+        final String mediaTypeFromHeader = descriptor.getMediaType();
+        if (mediaTypeFromHeader == null) {
+            return new MediaType("text", "plain");
+        } else {
+            return new MediaType(mediaTypeFromHeader, descriptor.getSubType());
+        }
+    }
+
+    private HeaderImpl readHeader(MimeTokenStream parser) throws IOException, MimeException {
+        final HeaderImpl header = new HeaderImpl();
+
+        EntityState next = parser.next();
+        while (next != EntityState.T_BODY && next != EntityState.T_END_OF_STREAM && next != EntityState.T_START_MULTIPART) {
+            if (next == EntityState.T_FIELD) {
+                header.addField(parser.getField());
+            }
+            next = parser.next();
+        }
+        return header;
+    }
+
+    private Flags getFlags(MailboxSession mailboxSession, boolean isRecent, Flags flagsToBeSet) {
+        final Flags flags;
+        if (flagsToBeSet == null) {
+            flags = new Flags();
+        } else {
+            flags = flagsToBeSet;
+
+            // Check if we need to trim the flags
+            trimFlags(flags, mailboxSession);
+
+        }
+        if (isRecent) {
+            flags.add(Flag.RECENT);
+        }
+        return flags;
+    }
+
+    private void setTextualLinesCount(MimeTokenStream parser, String mediaType, PropertyBuilder propertyBuilder) throws IOException, MimeException {
+        EntityState next;
+        if ("text".equalsIgnoreCase(mediaType)) {
+            final CountingInputStream bodyStream = new CountingInputStream(parser.getInputStream());
+            bodyStream.readAll();
+            long lines = bodyStream.getLineCount();
+            bodyStream.close();
+            next = parser.next();
+            if (next == EntityState.T_EPILOGUE) {
+                final CountingInputStream epilogueStream = new CountingInputStream(parser.getInputStream());
+                epilogueStream.readAll();
+                lines += epilogueStream.getLineCount();
+                epilogueStream.close();
+
+            }
+            propertyBuilder.setTextualLineCount(lines);
+        }
+    }
+
+    private void consumeStream(BufferedOutputStream bufferedOut, BufferedInputStream tmpMsgIn) throws IOException {
+        byte[] discard = new byte[4096];
+        while (tmpMsgIn.read(discard) != -1) {
+            // consume the rest of the stream so everything get copied to
+            // the file now
+            // via the TeeInputStream
+        }
+        bufferedOut.flush();
+    }
+
+    private int getBodyStartOctet(BodyOffsetInputStream bIn) {
+        int bodyStartOctet = (int) bIn.getBodyStartOffset();
+        if (bodyStartOctet == -1) {
+            bodyStartOctet = 0;
+        }
+        return bodyStartOctet;
+    }
+
+    private ComposedMessageId createAndDispatchMessage(Date internalDate, MailboxSession mailboxSession, File file, PropertyBuilder propertyBuilder, Flags flags, int bodyStartOctet) throws IOException, MailboxException {
+        try (SharedFileInputStream contentIn = new SharedFileInputStream(file)) {
+            final int size = (int) file.length();
+
+            final List<MessageAttachment> attachments = extractAttachments(contentIn);
+            propertyBuilder.setHasAttachment(hasNonInlinedAttachment(attachments));
+
+            final MailboxMessage message = createMessage(internalDate, size, bodyStartOctet, contentIn, flags, propertyBuilder, attachments);
+
+            new QuotaChecker(quotaManager, quotaRootResolver, mailbox).tryAddition(1, size);
+
+            return locker.executeWithLock(mailboxSession, getMailboxPath(), () -> {
+                MessageMetaData data = appendMessageToStore(message, attachments, mailboxSession);
+
+                Mailbox mailbox = getMailboxEntity();
 
+                eventBus.dispatch(EventFactory.added()
+                    .randomEventId()
+                    .mailboxSession(mailboxSession)
+                    .mailbox(mailbox)
+                    .addMetaData(message.metaData())
+                    .build(),
+                    new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                    .block();
+                return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
+            }, true);
+        }
+    }
+
+    private PropertyBuilder getPropertyBuilder(MaximalBodyDescriptor descriptor, String mediaType, String subType) {
+        final PropertyBuilder propertyBuilder = new PropertyBuilder();
+        propertyBuilder.setMediaType(mediaType);
+        propertyBuilder.setSubType(subType);
+        propertyBuilder.setContentID(descriptor.getContentId());
+        propertyBuilder.setContentDescription(descriptor.getContentDescription());
+        propertyBuilder.setContentLocation(descriptor.getContentLocation());
+        propertyBuilder.setContentMD5(descriptor.getContentMD5Raw());
+        propertyBuilder.setContentTransferEncoding(descriptor.getTransferEncoding());
+        propertyBuilder.setContentLanguage(descriptor.getContentLanguage());
+        propertyBuilder.setContentDispositionType(descriptor.getContentDispositionType());
+        propertyBuilder.setContentDispositionParameters(descriptor.getContentDispositionParameters());
+        propertyBuilder.setContentTypeParameters(descriptor.getContentTypeParameters());
+        // Add missing types
+        final String codeset = descriptor.getCharset();
+        if (codeset == null) {
+            if ("TEXT".equalsIgnoreCase(mediaType)) {
+                propertyBuilder.setCharset("us-ascii");
+            }
+        } else {
+            propertyBuilder.setCharset(codeset);
+        }
+        final String boundary = descriptor.getBoundary();
+        if (boundary != null) {
+            propertyBuilder.setBoundary(boundary);
+        }
+        return propertyBuilder;
     }
 
     private boolean hasNonInlinedAttachment(List<MessageAttachment> attachments) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/15: Allow Store.Impl to cary along a byte[] representation

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 30dc0b9f04d5bd3fc7121aedf8af8a972a356d35
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 7 15:08:00 2018 +0700

    Allow Store.Impl to cary along a byte[] representation
    
    Bytes are available but hidded behing an inputStream for implementation purposes.
    
    We propose in this commit a new level of indirection allowing to save an object as bytes or as InputSteam.
---
 .../main/java/org/apache/james/blob/api/Store.java | 24 ++++++++++++++++++----
 .../apache/james/blob/mail/MimeMessageStore.java   |  6 +++---
 2 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
index 6a4f859..ce640d5 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.blob.api;
 
-import java.io.InputStream;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.stream.Stream;
@@ -66,8 +65,25 @@ public interface Store<T, I> {
 
     class Impl<T, I extends BlobPartsId> implements Store<T, I> {
 
+        public interface ValueToSave {
+            Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore);
+        }
+
+        public static class BytesToSave implements ValueToSave {
+            private final byte[] bytes;
+
+            public BytesToSave(byte[] bytes) {
+                this.bytes = bytes;
+            }
+
+            @Override
+            public Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore) {
+                return blobStore.save(bucketName, bytes);
+            }
+        }
+
         public interface Encoder<T> {
-            Stream<Pair<BlobType, InputStream>> encode(T t);
+            Stream<Pair<BlobType, ValueToSave>> encode(T t);
         }
 
         public interface Decoder<T> {
@@ -94,9 +110,9 @@ public interface Store<T, I> {
                 .map(idFactory::generate);
         }
 
-        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) {
+        private Mono<Tuple2<BlobType, BlobId>> saveEntry(Pair<BlobType, ValueToSave> entry) {
             return Mono.just(entry.getLeft())
-                .zipWith(blobStore.save(BucketName.DEFAULT, entry.getRight()));
+                .zipWith(entry.getRight().saveIn(BucketName.DEFAULT, blobStore));
         }
 
         @Override
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index 12683f9..f0c2480 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -69,15 +69,15 @@ public class MimeMessageStore {
 
     static class MimeMessageEncoder implements Store.Impl.Encoder<MimeMessage> {
         @Override
-        public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) {
+        public Stream<Pair<BlobType, Store.Impl.ValueToSave>> encode(MimeMessage message) {
             try {
                 byte[] messageAsArray = messageToArray(message);
                 int bodyStartOctet = computeBodyStartOctet(messageAsArray);
                 byte[] headerBytes = getHeaderBytes(messageAsArray, bodyStartOctet);
                 byte[] bodyBytes = getBodyBytes(messageAsArray, bodyStartOctet);
                 return Stream.of(
-                    Pair.of(HEADER_BLOB_TYPE, new ByteArrayInputStream(headerBytes)),
-                    Pair.of(BODY_BLOB_TYPE, new ByteArrayInputStream(bodyBytes)));
+                    Pair.of(HEADER_BLOB_TYPE, new Store.Impl.BytesToSave(headerBytes)),
+                    Pair.of(BODY_BLOB_TYPE, new Store.Impl.BytesToSave(bodyBytes)));
             } catch (MessagingException | IOException e) {
                 throw new RuntimeException(e);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 15/15: Merge remote-tracking branch 'mbaechler/some-small-eventBus-fix'

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2548451538098db2912872d06c0421b38be5f372
Merge: 031775a 2338e64
Author: Antoine DUPRAT <ad...@linagora.com>
AuthorDate: Thu Jul 4 11:29:38 2019 +0200

    Merge remote-tracking branch 'mbaechler/some-small-eventBus-fix'

 .../mailbox/events/ErrorHandlingContract.java      |  4 ++--
 .../james/mailbox/events/GroupRegistration.java    | 25 ++++++++++------------
 .../james/mailbox/events/WaitDelayGenerator.java   | 24 ++++++++++++---------
 .../mailbox/events/WaitDelayGeneratorTest.java     |  8 +++----
 4 files changed, 31 insertions(+), 30 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/15: JAMES-2816 Fix Intelij warnings in SimpleMailboxMessage

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 398594db2e6869c37d8d65357ff82b1f3c3ab60b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Feb 18 10:12:45 2019 +0700

    JAMES-2816 Fix Intelij warnings in SimpleMailboxMessage
---
 .../mailbox/store/mail/model/impl/SimpleMailboxMessage.java    | 10 +++-------
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMailboxMessage.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMailboxMessage.java
index 479053d..34b3b5c 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMailboxMessage.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/model/impl/SimpleMailboxMessage.java
@@ -138,13 +138,9 @@ public class SimpleMailboxMessage extends DelegatingMailboxMessage {
             SimpleMailboxMessage simpleMailboxMessage = new SimpleMailboxMessage(messageId, internalDate, size,
                 bodyStartOctet, content, flags, propertyBuilder, mailboxId, attachments.build());
 
-            if (uid.isPresent()) {
-                simpleMailboxMessage.setUid(uid.get());
-            }
+            uid.ifPresent(simpleMailboxMessage::setUid);
+            modseq.ifPresent(simpleMailboxMessage::setModSeq);
 
-            if (modseq.isPresent()) {
-                simpleMailboxMessage.setModSeq(modseq.get());
-            }
             return simpleMailboxMessage;
         }
     }
@@ -218,7 +214,7 @@ public class SimpleMailboxMessage extends DelegatingMailboxMessage {
                                 PropertyBuilder propertyBuilder, MailboxId mailboxId) {
         this(messageId, internalDate, size, bodyStartOctet,
                 content, flags,
-                propertyBuilder, mailboxId, ImmutableList.<MessageAttachment>of());
+                propertyBuilder, mailboxId, ImmutableList.of());
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/15: JAMES-2816 Avoid copying the whole message when we only need to copy metadata

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6c20dff5a87926ba0657fec8a185c925ce000c26
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 15:15:34 2019 +0100

    JAMES-2816 Avoid copying the whole message when we only need to copy metadata
---
 mailbox/memory/src/test/resources/test.eml                  |  0
 .../mailbox/spamassassin/SpamAssassinListenerTest.java      |  4 ++--
 .../apache/james/mailbox/store/StoreMessageIdManager.java   |  2 +-
 .../org/apache/james/mailbox/store/StoreMessageManager.java | 13 ++-----------
 .../org/apache/james/mailbox/store/event/EventFactory.java  |  4 ----
 .../james/mailbox/store/MessageIdManagerTestSystem.java     |  2 +-
 6 files changed, 6 insertions(+), 19 deletions(-)

diff --git a/mailbox/memory/src/test/resources/test.eml b/mailbox/memory/src/test/resources/test.eml
new file mode 100644
index 0000000..e69de29
diff --git a/mailbox/plugin/spamassassin/src/test/java/org/apache/james/mailbox/spamassassin/SpamAssassinListenerTest.java b/mailbox/plugin/spamassassin/src/test/java/org/apache/james/mailbox/spamassassin/SpamAssassinListenerTest.java
index 36922be..72d8b39 100644
--- a/mailbox/plugin/spamassassin/src/test/java/org/apache/james/mailbox/spamassassin/SpamAssassinListenerTest.java
+++ b/mailbox/plugin/spamassassin/src/test/java/org/apache/james/mailbox/spamassassin/SpamAssassinListenerTest.java
@@ -241,7 +241,7 @@ public class SpamAssassinListenerTest {
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
             .mailbox(inbox)
-            .addMessage(message)
+            .addMetaData(message.metaData())
             .build();
 
         listener.event(addedEvent);
@@ -257,7 +257,7 @@ public class SpamAssassinListenerTest {
             .randomEventId()
             .mailboxSession(MAILBOX_SESSION)
             .mailbox(mailbox1)
-            .addMessage(message)
+            .addMetaData(message.metaData())
             .build();
 
         listener.event(addedEvent);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 1b28585..20a5b0f 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -395,7 +395,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .randomEventId()
                 .mailboxSession(mailboxSession)
                 .mailbox(mailboxMapper.findMailboxById(mailboxId))
-                .addMessage(copy)
+                .addMetaData(copy.metaData())
                 .build(),
                 new MailboxIdRegistrationKey(mailboxId))
                 .block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 5d63d3b..8c32c55 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -451,12 +451,12 @@ public class StoreMessageManager implements MessageManager {
                         MessageMetaData data = appendMessageToStore(message, attachments, mailboxSession);
 
                         Mailbox mailbox = getMailboxEntity();
-                        MailboxMessage copy = copyMessage(message);
+
                         eventBus.dispatch(EventFactory.added()
                             .randomEventId()
                             .mailboxSession(mailboxSession)
                             .mailbox(mailbox)
-                            .addMessage(copy)
+                            .addMetaData(message.metaData())
                             .build(),
                             new MailboxIdRegistrationKey(mailbox.getMailboxId()))
                             .block();
@@ -500,15 +500,6 @@ public class StoreMessageManager implements MessageManager {
         return new SimpleMailboxMessage(messageIdFactory.generate(), internalDate, size, bodyStartOctet, content, flags, propertyBuilder, getMailboxEntity().getMailboxId(), attachments);
     }
 
-    private MailboxMessage copyMessage(MailboxMessage message) throws MailboxException {
-        return SimpleMailboxMessage
-            .from(message)
-            .mailboxId(message.getMailboxId())
-            .uid(message.getUid())
-            .modseq(message.getModSeq())
-            .build();
-    }
-
     @Override
     public boolean isWriteable(MailboxSession session) throws MailboxException {
         return storeRightManager.isReadWrite(session, mailbox, getSharedPermanentFlags(session));
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
index efd3146..28969e8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java
@@ -111,10 +111,6 @@ public class EventFactory {
             return metaData(ImmutableSortedMap.of(metaData.getUid(), metaData));
         }
 
-        default T addMessage(MailboxMessage message) {
-            return addMetaData(message.metaData());
-        }
-
         default T addMetaData(Iterable<MessageMetaData> metaData) {
             return metaData(ImmutableList.copyOf(metaData)
                 .stream()
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
index 61d5b57..a0f56d2 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/MessageIdManagerTestSystem.java
@@ -87,7 +87,7 @@ public class MessageIdManagerTestSystem {
                 .randomEventId()
                 .mailboxSession(mailboxSession)
                 .mailbox(mailbox)
-                .addMessage(message)
+                .addMetaData(message.metaData())
                 .build(),
                 new MailboxIdRegistrationKey(mailboxId))
             .block();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 12/15: JAMES-2817 GroupRegistration deliver doesn't need to use subscribeWith

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2338e6461779ec114a9eb13f1da887a2f6030a64
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 14:26:33 2019 +0100

    JAMES-2817 GroupRegistration deliver doesn't need to use subscribeWith
---
 .../main/java/org/apache/james/mailbox/events/GroupRegistration.java | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index c704ad3..8d477e7 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -40,7 +40,6 @@ import com.rabbitmq.client.Connection;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.BindingSpecification;
@@ -143,9 +142,7 @@ class GroupRegistration implements Registration {
             .publishOn(Schedulers.elastic())
             .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
-            .then(Mono.fromRunnable(acknowledgableDelivery::ack))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+            .then(Mono.fromRunnable(acknowledgableDelivery::ack));
     }
 
     Mono<Void> reDeliver(Event event) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/15: JAMES-2816 Use buffered streams to handle mails

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 55257ba20f52d0d91043e2f6733d77209520e2d7
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 15:16:34 2019 +0100

    JAMES-2816 Use buffered streams to handle mails
    
    	x3 improvement in micro-benchmark, should be around 5% improvement in real life
    	appendMessage reported to take 8.51% in a flamegraph
---
 .../java/org/apache/james/mailbox/store/StoreMessageManager.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 8c32c55..5015d67 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.store;
 import static org.apache.james.mailbox.extension.PreDeletionHook.DeleteOperation;
 import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -102,7 +104,6 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -339,7 +340,8 @@ public class StoreMessageManager implements MessageManager {
             // source for the InputStream
             file = File.createTempFile("imap", ".msg");
             try (FileOutputStream out = new FileOutputStream(file);
-                 TeeInputStream tmpMsgIn = new TeeInputStream(msgIn, out);
+                 BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
+                 BufferedInputStream tmpMsgIn = new BufferedInputStream(new TeeInputStream(msgIn, bufferedOut));
                  BodyOffsetInputStream bIn = new BodyOffsetInputStream(tmpMsgIn)) {
                 // Disable line length... This should be handled by the smtp server
                 // component and not the parser itself
@@ -433,6 +435,7 @@ public class StoreMessageManager implements MessageManager {
                     // the file now
                     // via the TeeInputStream
                 }
+                bufferedOut.close();
                 int bodyStartOctet = (int) bIn.getBodyStartOffset();
                 if (bodyStartOctet == -1) {
                     bodyStartOctet = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 10/15: JAMES-2817 Avoid C-like cast syntax in GroupRegistration

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 37b0b39571f9f50268bdfa88979ace839bd84369
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 09:47:20 2019 +0100

    JAMES-2817 Avoid C-like cast syntax in GroupRegistration
---
 .../main/java/org/apache/james/mailbox/events/GroupRegistration.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 7730efc..c704ad3 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -164,7 +164,7 @@ class GroupRegistration implements Registration {
         return Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders())
             .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT)))
             .filter(object -> object instanceof Integer)
-            .map(object -> (Integer) object)
+            .map(Integer.class::cast)
             .orElse(DEFAULT_RETRY_COUNT);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/15: Avoid blob copy when saving byte arrays

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 862dd154133317eacffc8e5434083ac0d0452b1d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 7 14:14:26 2018 +0700

    Avoid blob copy when saving byte arrays
    
    We can iterate the in memory byte array first to compute the blobId to use,
    saving a blob copy.
---
 .../blob/objectstorage/ObjectStorageBlobsDAO.java     | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index 7bbd2da..fb22164 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -93,8 +93,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(BucketName bucketName, byte[] data) {
-        return save(bucketName, new ByteArrayInputStream(data));
+    public Mono<BlobId> save(byte[] data) {
+        BlobId blobId = blobIdFactory.forPayload(data);
+
+        Blob blob = blobStore.blobBuilder(blobId.asString())
+            .payload(payloadCodec.write(new ByteArrayInputStream(data)))
+            .build();
+
+        return save(bucketName, blob)
+            .thenApply(any -> blobId);
     }
 
     @Override
@@ -121,10 +128,15 @@ public class ObjectStorageBlobsDAO implements BlobStore {
                             .payload(payload.getPayload())
                             .build();
 
-        return Mono.fromRunnable(() -> putBlobFunction.putBlob(blob))
+        return save(blob)
             .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
     }
 
+    private Mono<String> save(Blob blob) {
+        String containerName = this.containerName.value();
+        return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob));
+    }
+
     @Override
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return Mono.fromCallable(() -> IOUtils.toByteArray(read(bucketName, blobId)));
@@ -145,7 +157,6 @@ public class ObjectStorageBlobsDAO implements BlobStore {
                 "Failed to readBytes blob " + blobId.asString(),
                 cause);
         }
-
     }
 
     public void deleteContainer() {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 13/15: Merge remote-tracking branch 'btellier/improve-blob-store-efficiency-v4'

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 859161869055c9ec79d9ab851ff6a9362885da63
Merge: 78813a8 088123b
Author: Antoine DUPRAT <ad...@linagora.com>
AuthorDate: Thu Jul 4 11:28:31 2019 +0200

    Merge remote-tracking branch 'btellier/improve-blob-store-efficiency-v4'

 .../main/java/org/apache/james/blob/api/Store.java | 24 ++++++++++++++++++----
 .../james/blob/objectstorage/AESPayloadCodec.java  |  5 +++++
 .../blob/objectstorage/DefaultPayloadCodec.java    |  9 ++++++++
 .../blob/objectstorage/ObjectStorageBlobsDAO.java  | 24 ++++++++++++++++------
 .../james/blob/objectstorage/PayloadCodec.java     |  2 ++
 .../apache/james/blob/mail/MimeMessageStore.java   |  6 +++---
 6 files changed, 57 insertions(+), 13 deletions(-)

diff --cc server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index 48840f8,c80f23e..aa209b5
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@@ -146,14 -158,8 +159,13 @@@ public class ObjectStorageBlobsDAO impl
                  "Failed to readBytes blob " + blobId.asString(),
                  cause);
          }
- 
      }
  
 +    @Override
 +    public Mono<Void> deleteBucket(BucketName bucketName) {
 +        throw new NotImplementedException("not implemented");
 +    }
 +
      public void deleteContainer() {
          blobStore.deleteContainer(containerName.value());
      }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/15: fixup! Avoid blob copy when saving byte arrays

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 088123b1cc97da4b48bc277a04f27604971615be
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Jul 2 11:33:56 2019 +0200

    fixup! Avoid blob copy when saving byte arrays
---
 .../org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java   | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index c38ea12..c80f23e 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -134,9 +134,8 @@ public class ObjectStorageBlobsDAO implements BlobStore {
             .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
     }
 
-    private Mono<String> save(BucketName bucketName, Blob blob) {
-        String containerName = this.containerName.value();
-        return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob));
+    private Mono<Void> save(BucketName bucketName, Blob blob) {
+        return Mono.fromRunnable(() -> putBlobFunction.putBlob(blob));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 14/15: Merge remote-tracking branch 'mbaechler/optimize-append-message'

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 031775a2051f4b9f92dbb6cd2cc7377c8f049ab9
Merge: 8591618 02fb91a
Author: Antoine DUPRAT <ad...@linagora.com>
AuthorDate: Thu Jul 4 11:29:09 2019 +0200

    Merge remote-tracking branch 'mbaechler/optimize-append-message'

 mailbox/memory/src/test/resources/test.eml         |   0
 .../spamassassin/SpamAssassinListenerTest.java     |   4 +-
 .../james/mailbox/store/StoreMessageIdManager.java |   2 +-
 .../james/mailbox/store/StoreMessageManager.java   | 290 ++++++++++++---------
 .../james/mailbox/store/event/EventFactory.java    |   4 -
 .../mail/model/impl/SimpleMailboxMessage.java      |  10 +-
 .../mailbox/store/MessageIdManagerTestSystem.java  |   2 +-
 7 files changed, 171 insertions(+), 141 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/15: JAMES-2817 Rename subscribeWorkQueue to consumeWorkQueue and subscribe in start()

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 87136c39e011e6d0197e3a0e85a8203c1e861e35
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 09:14:26 2019 +0100

    JAMES-2817 Rename subscribeWorkQueue to consumeWorkQueue and subscribe in start()
---
 .../apache/james/mailbox/events/GroupRegistration.java | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index bbdd46b..7730efc 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -37,7 +37,6 @@ import org.apache.james.util.MDCBuilder;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
-
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -105,10 +104,11 @@ class GroupRegistration implements Registration {
     }
 
     GroupRegistration start() {
-        createGroupWorkQueue()
-            .then(retryHandler.createRetryExchange(queueName))
-            .doOnSuccess(any -> this.subscribeWorkQueue())
-            .block();
+        receiverSubscriber = Optional
+            .of(createGroupWorkQueue()
+                .then(retryHandler.createRetryExchange(queueName))
+                .then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+                .block());
         return this;
     }
 
@@ -126,12 +126,12 @@ class GroupRegistration implements Registration {
             .then();
     }
 
-    private void subscribeWorkQueue() {
-        receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+    private Disposable consumeWorkQueue() {
+        return receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+            .publishOn(Schedulers.parallel())
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
             .flatMap(this::deliver)
-            .subscribeOn(Schedulers.elastic())
-            .subscribe());
+            .subscribe();
     }
 
     private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 11/15: JAMES-2817 WaitDelayGenerator jitter could remove time from next iteration

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d3c5a61f0591a9538be670188c59894c8f5862c8
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Feb 15 12:31:45 2019 +0100

    JAMES-2817 WaitDelayGenerator jitter could remove time from next iteration
---
 .../mailbox/events/ErrorHandlingContract.java      |  4 ++--
 .../james/mailbox/events/WaitDelayGenerator.java   | 24 +++++++++++++---------
 .../mailbox/events/WaitDelayGeneratorTest.java     |  8 ++++----
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index c6f4494..d0b3923 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -167,8 +167,8 @@ interface ErrorHandlingContract extends EventBusContract {
             softly.assertThat(timeElapsed).hasSize(4);
 
             long minFirstDelayAfter = 100; // first backOff
-            long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5)
-            long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5)
+            long minSecondDelayAfter = 50; // 50 * jitter factor (50 * 0.5)
+            long minThirdDelayAfter = 100; // 100 * jitter factor (100 * 0.5)
 
             softly.assertThat(timeElapsed.get(1))
                 .isAfterOrEqualTo(timeElapsed.get(0).plusMillis(minFirstDelayAfter));
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
index ff0378b..33a3586 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java
@@ -24,7 +24,7 @@ import java.time.Duration;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
+import com.google.common.primitives.Ints;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -34,12 +34,14 @@ class WaitDelayGenerator {
         return new WaitDelayGenerator(retryBackoff);
     }
 
-    private static int randomBetween(int lowest, int highest) {
-        Preconditions.checkArgument(lowest <= highest, "lowest always has to be less than or equals highest");
-        if (lowest == highest) {
-            return lowest;
+    private static Duration randomBetween(Duration base, Duration jitter) {
+        Preconditions.checkArgument(!jitter.isNegative(), "jitter value should always be positive");
+        if (jitter.isZero()) {
+            return base;
         }
-        return SECURE_RANDOM.nextInt(highest - lowest) + lowest;
+        long maxJitterAsMillis = jitter.toMillis();
+        long jitterAsMillis = SECURE_RANDOM.nextInt(Ints.checkedCast(maxJitterAsMillis * 2)) / 2;
+        return base.plusMillis(jitterAsMillis);
     }
 
     private static final SecureRandom SECURE_RANDOM = new SecureRandom();
@@ -65,11 +67,13 @@ class WaitDelayGenerator {
         if (!shouldDelay(retryCount)) {
             return Duration.ZERO;
         }
-        int exponentialFactor = Double.valueOf(Math.pow(2, retryCount - 1)).intValue();
-        int minDelay = exponentialFactor * (int) retryBackoff.getFirstBackoff().toMillis();
-        int maxDelay = Double.valueOf(minDelay + minDelay * retryBackoff.getJitterFactor()).intValue();
+        long exponentialFactor = Double.valueOf(Math.pow(2, retryCount - 1)).longValue();
+        Duration minDelay = retryBackoff.getFirstBackoff().multipliedBy(exponentialFactor);
+        Duration jitterDelay = retryBackoff.getFirstBackoff()
+            .multipliedBy(Double.valueOf(retryBackoff.getJitterFactor() * 100).intValue())
+            .dividedBy(100);
 
-        return Duration.ofMillis(randomBetween(minDelay, maxDelay));
+        return randomBetween(minDelay, jitterDelay);
     }
 
     private boolean shouldDelay(int retryCount) {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
index 9cb2d62..e7f1fb6 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java
@@ -46,13 +46,13 @@ class WaitDelayGeneratorTest {
 
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(generator.generateDelay(1).toMillis())
-                .isBetween(100L, 150L);
+                .isBetween(50L, 150L);
             softly.assertThat(generator.generateDelay(2).toMillis())
-                .isBetween(200L, 300L);
+                .isBetween(100L, 300L);
             softly.assertThat(generator.generateDelay(3).toMillis())
-                .isBetween(400L, 600L);
+                .isBetween(200L, 600L);
             softly.assertThat(generator.generateDelay(4).toMillis())
-                .isBetween(800L, 1200L);
+                .isBetween(300L, 1200L);
         });
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/15: Position data length when possible

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ed91ea3e94f57cf14719a199dcd3f15f7ad38f01
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 7 14:47:30 2018 +0700

    Position data length when possible
---
 .../james/blob/objectstorage/AESPayloadCodec.java      |  5 +++++
 .../james/blob/objectstorage/DefaultPayloadCodec.java  |  9 +++++++++
 .../blob/objectstorage/ObjectStorageBlobsDAO.java      | 18 ++++++++++--------
 .../apache/james/blob/objectstorage/PayloadCodec.java  |  2 ++
 4 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/AESPayloadCodec.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/AESPayloadCodec.java
index 2b0eaee..e19e51d 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/AESPayloadCodec.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/AESPayloadCodec.java
@@ -72,6 +72,11 @@ public class AESPayloadCodec implements PayloadCodec {
     }
 
     @Override
+    public Payload write(byte[] bytes) {
+        return write(new ByteArrayInputStream(bytes));
+    }
+
+    @Override
     public Payload write(InputStream inputStream) {
         try (FileBackedOutputStream outputStream = new FileBackedOutputStream(MAX_BYTES.intValue())) {
             outputStream.write(aead.encrypt(IOUtils.toByteArray(inputStream), EMPTY_ASSOCIATED_DATA));
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/DefaultPayloadCodec.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/DefaultPayloadCodec.java
index 6f56b26..9a6fcc3 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/DefaultPayloadCodec.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/DefaultPayloadCodec.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.objectstorage;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
@@ -32,6 +33,14 @@ public class DefaultPayloadCodec implements PayloadCodec {
     }
 
     @Override
+    public Payload write(byte[] bytes) {
+        if (bytes.length == 0) {
+            return write(new ByteArrayInputStream(bytes));
+        }
+        return new Payload(Payloads.newByteArrayPayload(bytes), Optional.of(new Long(bytes.length)));
+    }
+
+    @Override
     public InputStream read(Payload payload) throws IOException {
         return payload.getPayload().openStream();
     }
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
index fb22164..c38ea12 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.blob.objectstorage;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
@@ -93,15 +92,18 @@ public class ObjectStorageBlobsDAO implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(byte[] data) {
+    public Mono<BlobId> save(BucketName bucketName, byte[] data) {
+        Preconditions.checkNotNull(data);
         BlobId blobId = blobIdFactory.forPayload(data);
+        Payload payload = payloadCodec.write(data);
 
         Blob blob = blobStore.blobBuilder(blobId.asString())
-            .payload(payloadCodec.write(new ByteArrayInputStream(data)))
+            .payload(payload.getPayload())
+            .contentLength(payload.getLength().orElse(new Long(data.length)))
             .build();
 
         return save(bucketName, blob)
-            .thenApply(any -> blobId);
+            .thenReturn(blobId);
     }
 
     @Override
@@ -109,7 +111,7 @@ public class ObjectStorageBlobsDAO implements BlobStore {
         Preconditions.checkNotNull(data);
 
         BlobId tmpId = blobIdFactory.randomId();
-        return save(data, tmpId)
+        return save(bucketName, data, tmpId)
             .flatMap(id -> updateBlobId(tmpId, id));
     }
 
@@ -121,18 +123,18 @@ public class ObjectStorageBlobsDAO implements BlobStore {
             .thenReturn(to);
     }
 
-    private Mono<BlobId> save(InputStream data, BlobId id) {
+    private Mono<BlobId> save(BucketName bucketName, InputStream data, BlobId id) {
         HashingInputStream hashingInputStream = new HashingInputStream(Hashing.sha256(), data);
         Payload payload = payloadCodec.write(hashingInputStream);
         Blob blob = blobStore.blobBuilder(id.asString())
                             .payload(payload.getPayload())
                             .build();
 
-        return save(blob)
+        return save(bucketName, blob)
             .then(Mono.fromCallable(() -> blobIdFactory.from(hashingInputStream.hash().toString())));
     }
 
-    private Mono<String> save(Blob blob) {
+    private Mono<String> save(BucketName bucketName, Blob blob) {
         String containerName = this.containerName.value();
         return Mono.fromCallable(() -> blobStore.putBlob(containerName, blob));
     }
diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PayloadCodec.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PayloadCodec.java
index debb9fb..4bbbd0e 100644
--- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PayloadCodec.java
+++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/PayloadCodec.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
 public interface PayloadCodec {
     Payload write(InputStream is);
 
+    Payload write(byte[] bytes);
+
     InputStream read(Payload payload) throws IOException;
 
     PayloadCodec DEFAULT_CODEC = new DefaultPayloadCodec();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org