You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/08/23 04:21:44 UTC

[james-project] branch master updated (0ff5716 -> 4cb2104)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 0ff5716  JAMES-2826 Since java 9 there's a release parameter that do exactly what animal-sniffer does
     new 372a311  Revert "JAMES-2855 Upgrade junit 5 dependencies"
     new 7d5db9a  JAMES-2855 correct junit maven properties
     new e7a3d29  JAMES-2855 Upgrade parent POM version to latest
     new 9a3eb55  Revert "JAMES-2855 Fix issue with attributes field comparison on Mail with the upgrade of assertj lib"
     new f55d913  JAMES-2855 Using recursive comparaison on Mail with assertj upgrade
     new 18c13ab  JAMES-2869 Update website's Java version
     new b07e80d  JAMES-2853 Correctly limit concurrency with Reactor
     new 4ccb9b1  JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy InputStream
     new a683036  JAMES-2851 Replace CassandraBlobStore's PipedStreamSubscriber by ReactorUtils.toInputStream
     new 4cb2104  JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/mail/CassandraMessageDAO.java        |  3 +-
 .../cassandra/mail/CassandraMessageIdMapper.java   |  6 +-
 .../cassandra/mail/CassandraMessageMapper.java     |  4 +-
 .../mail/CassandraAttachmentOwnerDAOTest.java      |  4 +-
 .../james/mailbox/store/PreDeletionHooks.java      |  4 +-
 .../api/src/main/java/org/apache/mailet/Mail.java  |  7 --
 .../transport/mailets/StripAttachmentTest.java     |  5 +-
 .../java/org/apache/mailet/base/test/FakeMail.java |  5 --
 pom.xml                                            |  8 +-
 .../james/blob/cassandra/CassandraBlobStore.java   | 28 ++++---
 .../james/blob/cassandra/CassandraBucketDAO.java   |  9 +--
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  9 +--
 .../cassandra/utils/PipedStreamSubscriber.java     | 89 ----------------------
 .../blob/cassandra/CassandraBucketDAOTest.java     | 20 ++---
 .../cassandra/CassandraDefaultBucketDAOTest.java   | 18 ++---
 .../org/apache/james/server/core/MailImpl.java     |  5 --
 .../org/apache/james/server/core/MailImplTest.java | 25 ++++--
 .../java/org/apache/james/util/ReactorUtils.java   | 71 +++++++++++++++++
 .../org/apache/james/util/ReactorUtilsTest.java    | 87 ++++++++++++++++++++-
 src/homepage/index.html                            |  2 +-
 20 files changed, 236 insertions(+), 173 deletions(-)
 delete mode 100644 server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/10: JAMES-2853 Correctly limit concurrency with Reactor

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b07e80dc31170c53bb403e36b770269b37f1a074
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Aug 20 15:06:45 2019 +0200

    JAMES-2853 Correctly limit concurrency with Reactor
---
 .../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java    | 3 +--
 .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java      | 6 ++----
 .../apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java | 4 +---
 .../mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java     | 4 ++--
 .../main/java/org/apache/james/mailbox/store/PreDeletionHooks.java  | 4 ++--
 5 files changed, 8 insertions(+), 13 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index de53a32..f2e5319 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -234,9 +234,8 @@ public class CassandraMessageDAO {
     public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
         return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
             .publishOn(Schedulers.elastic())
-            .limitRate(configuration.getMessageReadChunkSize())
             .flatMap(id -> retrieveRow(id, fetchType)
-                .flatMap(resultSet -> message(resultSet, id, fetchType)));
+                .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize());
     }
 
     private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 830ae98..33db97c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -92,8 +92,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
             .publishOn(Schedulers.elastic())
-            .limitRate(cassandraConfiguration.getMessageReadChunkSize())
-            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))
+            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
             .collectList()
             .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
             .filter(CassandraMessageDAO.MessageResult::isFound)
@@ -178,9 +177,8 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     public void delete(Multimap<MessageId, MailboxId> ids) {
         Flux.fromIterable(ids.asMap()
             .entrySet())
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
             .publishOn(Schedulers.elastic())
-            .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
+            .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize())
             .then()
             .block();
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index bdbf91b..18ac059 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -203,7 +203,6 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
             .collect(Guavate.toImmutableList())
             .block();
     }
@@ -213,8 +212,7 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return Flux.fromStream(uids.stream())
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
-            .flatMap(messageUid -> expungeOne(mailboxId, messageUid))
+            .flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize())
             .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
             .block();
     }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
index 1140f4f..1ae2c67 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
@@ -76,11 +76,11 @@ class CassandraAttachmentOwnerDAOTest {
 
     @Test
     void retrieveOwnersShouldNotThrowWhenMoreReferencesThanPaging() {
+        int concurrency = 128;
         int referenceCountExceedingPaging = 5050;
 
         Flux.range(0, referenceCountExceedingPaging)
-            .limitRate(128)
-            .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)))
+            .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)), concurrency)
             .blockLast();
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).toIterable())
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
index aac7b5c..b868d62 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
@@ -34,6 +34,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class PreDeletionHooks {
+    private static final int CONCURRENCY = 1;
     public static final PreDeletionHooks NO_PRE_DELETION_HOOK = new PreDeletionHooks(ImmutableSet.of(), new NoopMetricFactory());
 
     static final String PRE_DELETION_HOOK_METRIC_NAME = "preDeletionHook";
@@ -50,9 +51,8 @@ public class PreDeletionHooks {
     public Mono<Void> runHooks(PreDeletionHook.DeleteOperation deleteOperation) {
         return Flux.fromIterable(hooks)
             .publishOn(Schedulers.elastic())
-            .limitRate(1)
             .flatMap(hook -> metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME,
-                Mono.from(hook.notifyDelete(deleteOperation))))
+                Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY)
             .then();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/10: JAMES-2855 Upgrade parent POM version to latest

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e7a3d29bbe2facb0a1c2c0a74434aec39ced6a32
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Aug 22 13:45:31 2019 +0700

    JAMES-2855 Upgrade parent POM version to latest
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 3f94fa2..b327e9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache</groupId>
         <artifactId>apache</artifactId>
-        <version>18</version>
+        <version>21</version>
         <relativePath />
     </parent>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/10: Revert "JAMES-2855 Fix issue with attributes field comparison on Mail with the upgrade of assertj lib"

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9a3eb558475552cbdf78a1507f2f66ade168dc0e
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Aug 21 14:22:56 2019 +0700

    Revert "JAMES-2855 Fix issue with attributes field comparison on Mail with the upgrade of assertj lib"
    
    This reverts commit 010347983b7747ceff504b4ef8fe41221c1f20eb.
---
 mailet/api/src/main/java/org/apache/mailet/Mail.java               | 7 -------
 .../test/src/main/java/org/apache/mailet/base/test/FakeMail.java   | 5 -----
 .../core/src/main/java/org/apache/james/server/core/MailImpl.java  | 5 -----
 3 files changed, 17 deletions(-)

diff --git a/mailet/api/src/main/java/org/apache/mailet/Mail.java b/mailet/api/src/main/java/org/apache/mailet/Mail.java
index 6d3ed68..637edea 100644
--- a/mailet/api/src/main/java/org/apache/mailet/Mail.java
+++ b/mailet/api/src/main/java/org/apache/mailet/Mail.java
@@ -230,13 +230,6 @@ public interface Mail extends Serializable, Cloneable {
     void setState(String state);
 
     /**
-     * Returns the attributes of this message.
-     *
-     * @return the attributes of this message
-     */
-    Map<AttributeName, Attribute> getAttributes();
-
-    /**
      * Get the stream of all attributes
      */
     Stream<Attribute> attributes();
diff --git a/mailet/test/src/main/java/org/apache/mailet/base/test/FakeMail.java b/mailet/test/src/main/java/org/apache/mailet/base/test/FakeMail.java
index 15e16a5..4ac9fe2 100644
--- a/mailet/test/src/main/java/org/apache/mailet/base/test/FakeMail.java
+++ b/mailet/test/src/main/java/org/apache/mailet/base/test/FakeMail.java
@@ -430,11 +430,6 @@ public class FakeMail implements Mail {
     }
 
     @Override
-    public Map<AttributeName, Attribute> getAttributes() {
-        return attributes;
-    }
-
-    @Override
     public Stream<Attribute> attributes() {
         return attributes.values().stream();
     }
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
index 7f9b325..eef5335 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MailImpl.java
@@ -664,11 +664,6 @@ public class MailImpl implements Disposable, Mail {
         message = null;
     }
 
-    @Override
-    public Map<AttributeName, Attribute> getAttributes() {
-        return attributes;
-    }
-
     /**
      * <p>
      * This method is necessary, when Mail repositories needs to deal explicitly


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/10: JAMES-2855 correct junit maven properties

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7d5db9a7d49f1ce62c3aa7a22c546560ecb76ab7
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Thu Aug 22 18:51:36 2019 +0700

    JAMES-2855 correct junit maven properties
---
 pom.xml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 26b485b..3f94fa2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -605,7 +605,7 @@
         <dnsjava.version>2.1.9</dnsjava.version>
         <junit.version>4.12</junit.version>
         <junit.jupiter.version>5.2.0</junit.jupiter.version>
-        <junit.plateform.version>1.4.1</junit.plateform.version>
+        <junit.platform.version>1.4.1</junit.platform.version>
         <junit.vintage.version>5.2.0</junit.vintage.version>
         <concurrent.version>1.3.4</concurrent.version>
         <xbean-spring.version>4.9</xbean-spring.version>
@@ -2472,12 +2472,12 @@
             <dependency>
                 <groupId>org.junit.platform</groupId>
                 <artifactId>junit-platform-engine</artifactId>
-                <version>${junit.plateform.version}</version>
+                <version>${junit.platform.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.junit.platform</groupId>
                 <artifactId>junit-platform-launcher</artifactId>
-                <version>${junit.plateform.version}</version>
+                <version>${junit.platform.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.junit.vintage</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/10: JAMES-2855 Using recursive comparaison on Mail with assertj upgrade

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f55d913a07d76fc918a92541665e6b98482afcab
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Aug 21 15:02:03 2019 +0700

    JAMES-2855 Using recursive comparaison on Mail with assertj upgrade
---
 .../transport/mailets/StripAttachmentTest.java     |  5 ++++-
 .../org/apache/james/server/core/MailImplTest.java | 25 ++++++++++++++++------
 2 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/StripAttachmentTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/StripAttachmentTest.java
index 4d244c5..4eb7979 100644
--- a/mailet/standard/src/test/java/org/apache/james/transport/mailets/StripAttachmentTest.java
+++ b/mailet/standard/src/test/java/org/apache/james/transport/mailets/StripAttachmentTest.java
@@ -108,7 +108,10 @@ class StripAttachmentTest {
 
         mailet.service(mail);
 
-        assertThat(mail).isEqualToIgnoringGivenFields(expectedMail, "msg");
+        assertThat(mail)
+            .usingRecursiveComparison()
+            .ignoringFields("msg")
+            .isEqualTo(expectedMail);
         assertThat(mail.getMessage().getContent()).isEqualTo("simple text");
     }
     
diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MailImplTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MailImplTest.java
index 3e0811e..33a51d4 100644
--- a/server/container/core/src/test/java/org/apache/james/server/core/MailImplTest.java
+++ b/server/container/core/src/test/java/org/apache/james/server/core/MailImplTest.java
@@ -95,7 +95,10 @@ public class MailImplTest extends ContractMailTest {
             .build();
 
         MailImpl expected = newMail();
-        assertThat(mail).isEqualToIgnoringGivenFields(expected, "sender", "name", "recipients", "lastUpdated");
+        assertThat(mail)
+            .usingRecursiveComparison()
+            .ignoringFields("sender", "name", "recipients", "lastUpdated")
+            .isEqualTo(expected);
         assertThat(mail.getLastUpdated()).isCloseTo(new Date(), TimeUnit.SECONDS.toMillis(1));
     }
 
@@ -129,7 +132,10 @@ public class MailImplTest extends ContractMailTest {
             .mimeMessage(emptyMessage)
             .build();
 
-        assertThat(mail).isEqualToIgnoringGivenFields(expected, "message", "lastUpdated");
+        assertThat(mail)
+            .usingRecursiveComparison()
+            .ignoringFields("message", "lastUpdated")
+            .isEqualTo(expected);
         assertThat(mail.getLastUpdated()).isCloseTo(new Date(), TimeUnit.SECONDS.toMillis(1));
     }
 
@@ -155,7 +161,11 @@ public class MailImplTest extends ContractMailTest {
 
         MailImpl duplicate = MailImpl.duplicate(mail);
 
-        assertThat(duplicate).isNotSameAs(mail).isEqualToIgnoringGivenFields(mail, "message", "name");
+        assertThat(duplicate)
+            .isNotSameAs(mail)
+            .usingRecursiveComparison()
+            .ignoringFields("message", "name")
+            .isEqualTo(mail);
         assertThat(duplicate.getName()).isNotEqualTo(name);
         assertThat(duplicate.getMessage().getInputStream()).hasSameContentAs(mail.getMessage().getInputStream());
     }
@@ -318,7 +328,8 @@ public class MailImplTest extends ContractMailTest {
 
         assertThat(unserialized)
             .isInstanceOf(MailImpl.class)
-            .isEqualToComparingFieldByField(mail);
+            .usingRecursiveComparison()
+            .isEqualTo(mail);
     }
 
     @Test
@@ -339,7 +350,8 @@ public class MailImplTest extends ContractMailTest {
 
         assertThat(unserialized)
             .isInstanceOf(MailImpl.class)
-            .isEqualToComparingFieldByField(mail);
+            .usingRecursiveComparison()
+            .isEqualTo(mail);
     }
 
     @Test
@@ -360,6 +372,7 @@ public class MailImplTest extends ContractMailTest {
 
         assertThat(unserialized)
             .isInstanceOf(MailImpl.class)
-            .isEqualToComparingFieldByField(mail);
+            .usingRecursiveComparison()
+            .isEqualTo(mail);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/10: JAMES-2851 Add ReactorUtils.toInputStream to convert Flux to lazy InputStream

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4ccb9b1636e5d6043ab37b31acca1c1bd16ce977
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Jul 31 13:15:19 2019 +0200

    JAMES-2851 Add ReactorUtils.toInputStream to convert Flux<byte[]> to lazy InputStream
---
 .../java/org/apache/james/util/ReactorUtils.java   | 70 ++++++++++++++++++
 .../org/apache/james/util/ReactorUtilsTest.java    | 82 +++++++++++++++++++++-
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 42e6d8e..1ed7963 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,10 +18,80 @@
  ****************************************************************/
 package org.apache.james.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Stream;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ReactorUtils {
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
+
+    public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+        return new StreamInputStream(byteArrays.toStream(1));
+    }
+
+    private static  class StreamInputStream extends InputStream {
+        private static final int NO_MORE_DATA = -1;
+
+        private final Stream<byte[]> source;
+        private final Spliterator<byte[]> spliterator;
+        private Optional<ByteArrayInputStream> currentItemByteStream;
+
+        StreamInputStream(Stream<byte[]> source) {
+            this.source = source;
+            this.spliterator = source.spliterator();
+            this.currentItemByteStream = Optional.empty();
+        }
+
+        @Override
+        public int read() {
+            try {
+                if (!dataAvailableToRead()) {
+                    switchToNextChunk();
+                }
+
+                if (!dataAvailableToRead()) {
+                    source.close();
+                    return NO_MORE_DATA;
+                }
+
+                return currentItemByteStream.map(ByteArrayInputStream::read)
+                    .filter(readResult -> readResult != NO_MORE_DATA)
+                    .orElseGet(this::readNextChunk);
+            } catch (Throwable t) {
+                source.close();
+                throw t;
+            }
+        }
+
+        private boolean dataAvailableToRead() {
+            return currentItemByteStream.isPresent();
+        }
+
+        private void switchToNextChunk() {
+            spliterator.tryAdvance(bytes ->
+                currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+        }
+
+        private Integer readNextChunk() {
+            currentItemByteStream = Optional.empty();
+            return read();
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                source.close();
+            } finally {
+                super.close();
+            }
+        }
+    }
 }
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 3c991e0..98bb9e1 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -20,10 +20,18 @@ package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 class ReactorUtilsTest {
 
@@ -69,4 +77,76 @@ class ReactorUtilsTest {
             }
         }
     }
-}
\ No newline at end of file
+
+    @Nested
+    class ToInputStream {
+        @Test
+        void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.range(0, 10)
+                .subscribeOn(Schedulers.elastic())
+                .limitRate(2)
+                .doOnRequest(request -> generateElements.getAndAdd((int) request))
+                .map(index -> new byte[] {(byte) (int) index});
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(6);
+        }
+
+        @Test
+        void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(3);
+        }
+
+        @Test
+        void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(4);
+        }
+
+        @Test
+        void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
+            AtomicInteger generateElements = new AtomicInteger(0);
+            Flux<byte[]> source = Flux.<byte[]>empty()
+                    .subscribeOn(Schedulers.elastic())
+                    .limitRate(2)
+                    .doOnRequest(request -> generateElements.getAndAdd((int) request));
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+            byte[] readBytes = new byte[5];
+            inputStream.read(readBytes, 0, readBytes.length);
+
+            assertThat(readBytes).contains(0, 0, 0, 0, 0);
+            Thread.sleep(200);
+            assertThat(generateElements.get()).isEqualTo(1);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/10: JAMES-2869 Update website's Java version

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 18c13abd7c1d3a71fcf7667c20902df2b44cf4db
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Wed Aug 21 09:03:12 2019 +0200

    JAMES-2869 Update website's Java version
---
 src/homepage/index.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/homepage/index.html b/src/homepage/index.html
index c34dd9e..153fdd3 100644
--- a/src/homepage/index.html
+++ b/src/homepage/index.html
@@ -73,7 +73,7 @@ layout: default
               <div class="about-table">
                 <h3 class="tb-h3">James from a technical point of view</h3>
                 <b>Complete portability</b> (100% pure Java)<br/>
-                <b>Running on Java 6 onwards</b> and on the <b>JVM</b>. Some optional components might require Java 8.<br/><br/>
+                <b>Built with Java 8</b> and running on the <b>JRE 1.8</b>.<br/><br/>
                 <b>James Components:</b><br/>
                 - <b>Emailing protocols:</b> SMTP, LMTP, POP3, IMAP, ManageSieve, JMAP<br/>
                 - <b>Mailet container:</b> independent, extensible and pluggable email processing agents<br/>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 10/10: JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4cb2104ea88b1c5468d673be360be333f38e1a49
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Aug 20 14:20:18 2019 +0200

    JAMES-2851 CassandraBlobStore reads ByteBuffer instead of byte[]
---
 .../james/blob/cassandra/CassandraBlobStore.java     | 20 ++++++++++++++++----
 .../james/blob/cassandra/CassandraBucketDAO.java     |  9 +++------
 .../blob/cassandra/CassandraDefaultBucketDAO.java    |  9 +++------
 .../james/blob/cassandra/CassandraBucketDAOTest.java | 20 ++++++++++----------
 .../cassandra/CassandraDefaultBucketDAOTest.java     | 18 +++++++++---------
 .../java/org/apache/james/util/ReactorUtils.java     | 19 ++++++++++---------
 .../java/org/apache/james/util/ReactorUtilsTest.java | 15 ++++++++++-----
 7 files changed, 61 insertions(+), 49 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 171f215..7f7efe3 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.cassandra;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -42,7 +43,6 @@ import org.apache.james.util.ReactorUtils;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -114,7 +114,7 @@ public class CassandraBlobStore implements BlobStore {
     public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
         return readBlobParts(bucketName, blobId)
             .collectList()
-            .map(parts -> Bytes.concat(parts.toArray(new byte[0][])));
+            .map(this::byteBuffersToBytesArray);
     }
 
     @Override
@@ -127,7 +127,7 @@ public class CassandraBlobStore implements BlobStore {
         return BucketName.DEFAULT;
     }
 
-    private Flux<byte[]> readBlobParts(BucketName bucketName, BlobId blobId) {
+    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) {
         Integer rowCount = selectRowCount(bucketName, blobId)
             .publishOn(Schedulers.elastic())
             .single()
@@ -173,7 +173,7 @@ public class CassandraBlobStore implements BlobStore {
         }
     }
 
-    private Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
+    private Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, Integer partIndex) {
         if (isDefaultBucket(bucketName)) {
             return defaultBucketDAO.readPart(blobId, partIndex);
         } else {
@@ -208,4 +208,16 @@ public class CassandraBlobStore implements BlobStore {
     private boolean isDefaultBucket(BucketName bucketName) {
         return bucketName.equals(getDefaultBucketName());
     }
+
+    private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
+        int targetSize = byteBuffers
+            .stream()
+            .mapToInt(ByteBuffer::remaining)
+            .sum();
+
+        return byteBuffers
+            .stream()
+            .reduce(ByteBuffer.allocate(targetSize), (accumulator, element) -> accumulator.put(element))
+            .array();
+    }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index 8d355bf..f6d124f 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -145,7 +145,7 @@ class CassandraBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
-    Mono<byte[]> readPart(BucketName bucketName, BlobId blobId, int position) {
+    Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
                 .setString(BucketBlobParts.BUCKET, bucketName.asString())
@@ -173,10 +173,7 @@ class CassandraBucketDAO {
             .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID))));
     }
 
-    private byte[] rowToData(Row row) {
-        ByteBuffer byteBuffer = row.getBytes(BucketBlobParts.DATA);
-        byte[] data = new byte[byteBuffer.remaining()];
-        byteBuffer.get(data);
-        return data;
+    private ByteBuffer rowToData(Row row) {
+        return row.getBytes(BucketBlobParts.DATA);
     }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index 3bcc99e..d564066 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -123,7 +123,7 @@ public class CassandraDefaultBucketDAO {
             .map(row -> row.getInt(NUMBER_OF_CHUNK));
     }
 
-    Mono<byte[]> readPart(BlobId blobId, int position) {
+    Mono<ByteBuffer> readPart(BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
                 .setString(DefaultBucketBlobParts.ID, blobId.asString())
@@ -143,10 +143,7 @@ public class CassandraDefaultBucketDAO {
                 .setString(DefaultBucketBlobParts.ID, blobId.asString()));
     }
 
-    private byte[] rowToData(Row row) {
-        ByteBuffer byteBuffer = row.getBytes(DefaultBucketBlobParts.DATA);
-        byte[] data = new byte[byteBuffer.remaining()];
-        byteBuffer.get(data);
-        return data;
+    private ByteBuffer rowToData(Row row) {
+        return row.getBytes(DefaultBucketBlobParts.DATA);
     }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
index 66a7abc..2142200 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBucketDAOTest.java
@@ -55,7 +55,7 @@ class CassandraBucketDAOTest {
 
     @Test
     void readPartShouldReturnEmptyWhenNone() {
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -88,7 +88,7 @@ class CassandraBucketDAOTest {
 
         testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
         assertThat(maybeBytes).isEmpty();
     }
 
@@ -99,8 +99,8 @@ class CassandraBucketDAOTest {
 
         testee.deleteParts(BUCKET_NAME, BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
-        Optional<byte[]> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes2 = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
         assertThat(maybeBytes).isEmpty();
         assertThat(maybeBytes2).isEmpty();
     }
@@ -109,16 +109,16 @@ class CassandraBucketDAOTest {
     void readPartShouldReturnPreviouslySavedData() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
     }
 
     @Test
     void readPartShouldNotReturnContentOfOtherParts() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION_2).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -127,7 +127,7 @@ class CassandraBucketDAOTest {
     void readPartShouldNotReturnContentOfOtherBuckets() {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME_2, BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -137,9 +137,9 @@ class CassandraBucketDAOTest {
         testee.writePart(ByteBuffer.wrap(DATA), BUCKET_NAME, BLOB_ID, POSITION).block();
         testee.writePart(ByteBuffer.wrap(DATA_2), BUCKET_NAME, BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BUCKET_NAME, BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA_2);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
     }
 
     @Test
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
index 01ffaab..78d9359 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAOTest.java
@@ -52,7 +52,7 @@ class CassandraDefaultBucketDAOTest {
 
     @Test
     void readPartShouldReturnEmptyWhenNone() {
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -61,16 +61,16 @@ class CassandraDefaultBucketDAOTest {
     void readPartShouldReturnPreviouslySavedData() {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA));
     }
 
     @Test
     void readPartShouldNotReturnContentOfOtherParts() {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
 
         assertThat(maybeBytes).isEmpty();
     }
@@ -80,9 +80,9 @@ class CassandraDefaultBucketDAOTest {
         testee.writePart(ByteBuffer.wrap(DATA), BLOB_ID, POSITION).block();
         testee.writePart(ByteBuffer.wrap(DATA_2), BLOB_ID, POSITION).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
 
-        assertThat(maybeBytes).contains(DATA_2);
+        assertThat(maybeBytes).contains(ByteBuffer.wrap(DATA_2));
     }
 
     @Test
@@ -138,7 +138,7 @@ class CassandraDefaultBucketDAOTest {
 
         testee.deleteParts(BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
         assertThat(maybeBytes).isEmpty();
     }
 
@@ -149,8 +149,8 @@ class CassandraDefaultBucketDAOTest {
 
         testee.deleteParts(BLOB_ID).block();
 
-        Optional<byte[]> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
-        Optional<byte[]> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
+        Optional<ByteBuffer> maybeBytes = testee.readPart(BLOB_ID, POSITION).blockOptional();
+        Optional<ByteBuffer> maybeBytes2 = testee.readPart(BLOB_ID, POSITION_2).blockOptional();
         assertThat(maybeBytes).isEmpty();
         assertThat(maybeBytes2).isEmpty();
     }
diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 1ed7963..df51e07 100644
--- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -18,9 +18,9 @@
  ****************************************************************/
 package org.apache.james.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.Spliterator;
 import java.util.stream.Stream;
@@ -33,18 +33,18 @@ public class ReactorUtils {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
 
-    public static InputStream toInputStream(Flux<byte[]> byteArrays) {
+    public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
         return new StreamInputStream(byteArrays.toStream(1));
     }
 
     private static  class StreamInputStream extends InputStream {
         private static final int NO_MORE_DATA = -1;
 
-        private final Stream<byte[]> source;
-        private final Spliterator<byte[]> spliterator;
-        private Optional<ByteArrayInputStream> currentItemByteStream;
+        private final Stream<ByteBuffer> source;
+        private final Spliterator<ByteBuffer> spliterator;
+        private Optional<ByteBuffer> currentItemByteStream;
 
-        StreamInputStream(Stream<byte[]> source) {
+        StreamInputStream(Stream<ByteBuffer> source) {
             this.source = source;
             this.spliterator = source.spliterator();
             this.currentItemByteStream = Optional.empty();
@@ -62,8 +62,9 @@ public class ReactorUtils {
                     return NO_MORE_DATA;
                 }
 
-                return currentItemByteStream.map(ByteArrayInputStream::read)
-                    .filter(readResult -> readResult != NO_MORE_DATA)
+                return currentItemByteStream
+                    .filter(ByteBuffer::hasRemaining)
+                    .map(buffer -> buffer.get() & 0xFF)
                     .orElseGet(this::readNextChunk);
             } catch (Throwable t) {
                 source.close();
@@ -77,7 +78,7 @@ public class ReactorUtils {
 
         private void switchToNextChunk() {
             spliterator.tryAdvance(bytes ->
-                currentItemByteStream = Optional.of(new ByteArrayInputStream(bytes)));
+                currentItemByteStream = Optional.of(bytes));
         }
 
         private Integer readNextChunk() {
diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 98bb9e1..7bdc678 100644
--- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -83,11 +84,12 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.range(0, 10)
+            Flux<ByteBuffer> source = Flux.range(0, 10)
                 .subscribeOn(Schedulers.elastic())
                 .limitRate(2)
                 .doOnRequest(request -> generateElements.getAndAdd((int) request))
-                .map(index -> new byte[] {(byte) (int) index});
+                .map(index -> new byte[] {(byte) (int) index})
+                .map(ByteBuffer::wrap);
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
             byte[] readBytes = new byte[5];
@@ -101,8 +103,9 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8})
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 
@@ -118,8 +121,9 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 
@@ -135,8 +139,9 @@ class ReactorUtilsTest {
         @Test
         void givenAnEmptyFluxShouldConsumeOnlyThePrefetch() throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<byte[]> source = Flux.<byte[]>empty()
+            Flux<ByteBuffer> source = Flux.<byte[]>empty()
                     .subscribeOn(Schedulers.elastic())
+                    .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) request));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/10: Revert "JAMES-2855 Upgrade junit 5 dependencies"

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 372a311dc301c37dbeed759ef578e3d18375fda2
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Aug 21 15:37:44 2019 +0700

    Revert "JAMES-2855 Upgrade junit 5 dependencies"
    
    This reverts commit ec8e49a8.
    `mvn test` keep failing with new version, for example the memory jmap
    integration test.
---
 pom.xml | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index f491cad..26b485b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -604,9 +604,9 @@
 
         <dnsjava.version>2.1.9</dnsjava.version>
         <junit.version>4.12</junit.version>
-        <junit.jupiter.version>5.5.1</junit.jupiter.version>
-        <junit.platform.version>1.5.1</junit.platform.version>
-        <junit.vintage.version>5.5.1</junit.vintage.version>
+        <junit.jupiter.version>5.2.0</junit.jupiter.version>
+        <junit.plateform.version>1.4.1</junit.plateform.version>
+        <junit.vintage.version>5.2.0</junit.vintage.version>
         <concurrent.version>1.3.4</concurrent.version>
         <xbean-spring.version>4.9</xbean-spring.version>
         <netty.version>3.10.6.Final</netty.version>
@@ -2472,12 +2472,12 @@
             <dependency>
                 <groupId>org.junit.platform</groupId>
                 <artifactId>junit-platform-engine</artifactId>
-                <version>${junit.platform.version}</version>
+                <version>${junit.plateform.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.junit.platform</groupId>
                 <artifactId>junit-platform-launcher</artifactId>
-                <version>${junit.platform.version}</version>
+                <version>${junit.plateform.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.junit.vintage</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/10: JAMES-2851 Replace CassandraBlobStore's PipedStreamSubscriber by ReactorUtils.toInputStream

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a683036ae8c0706916326024b6bd18cbc0eef931
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Aug 2 10:59:12 2019 +0200

    JAMES-2851 Replace CassandraBlobStore's PipedStreamSubscriber by ReactorUtils.toInputStream
---
 .../james/blob/cassandra/CassandraBlobStore.java   |  8 +-
 .../cassandra/utils/PipedStreamSubscriber.java     | 89 ----------------------
 2 files changed, 2 insertions(+), 95 deletions(-)

diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
index 9cdb7c7..171f215 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStore.java
@@ -37,8 +37,7 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.cassandra.utils.DataChunker;
-import org.apache.james.blob.cassandra.utils.PipedInputStreamHandlingError;
-import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber;
+import org.apache.james.util.ReactorUtils;
 
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
@@ -120,10 +119,7 @@ public class CassandraBlobStore implements BlobStore {
 
     @Override
     public InputStream read(BucketName bucketName, BlobId blobId) {
-        PipedInputStreamHandlingError pipedInputStream = new PipedInputStreamHandlingError();
-        readBlobParts(bucketName, blobId)
-            .subscribe(new PipedStreamSubscriber(pipedInputStream));
-        return pipedInputStream;
+        return ReactorUtils.toInputStream(readBlobParts(bucketName, blobId));
     }
 
     @Override
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
deleted file mode 100644
index f9fcade..0000000
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.blob.cassandra.utils;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-
-import org.reactivestreams.Subscription;
-
-import com.google.common.base.Preconditions;
-
-import reactor.core.publisher.BaseSubscriber;
-
-public class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
-    private final PipedInputStreamHandlingError in;
-    private PipedOutputStreamHandlingError out;
-
-    public PipedStreamSubscriber(PipedInputStreamHandlingError in) {
-        Preconditions.checkNotNull(in, "The input stream must not be null");
-        this.in = in;
-    }
-
-    @Override
-    protected void hookOnSubscribe(Subscription subscription) {
-        super.hookOnSubscribe(subscription);
-        try {
-            this.out = new PipedOutputStreamHandlingError(in);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    @Override
-    protected void hookOnNext(byte[] payload) {
-        try {
-            out.write(payload);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    @Override
-    protected void hookOnComplete() {
-        close();
-    }
-
-    @Override
-    protected void hookOnError(Throwable error) {
-        if (error instanceof RuntimeException) {
-            out.propagateError((RuntimeException) error);
-        } else {
-            out.propagateError(new RuntimeException(error));
-        }
-
-        close();
-    }
-
-    @Override
-    protected void hookOnCancel() {
-        close();
-    }
-
-    private void close() {
-        try {
-            if (out != null) {
-                out.close();
-            }
-        } catch (IOException ignored) {
-            //ignored
-        }
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org