You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/09 07:26:09 UTC

[james-project] branch master updated (533097b -> 9097b9a)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 533097b  JAMES-3593 Recommend upgrade of RabbitMQ to 3.8.16
     new 1308e1e  [PERFORMANCE] Migrate where possible from Mono.fatMap to flatMapIterable
     new 814951c  [PERFORMANCE] CassandraDeletedMessageDAO should use CassandraAsyncExecutor::executeRows
     new 0c80864  [PERFORMANCE] CassandraMailboxDAO should use CassandraAsyncExecutor::executeRows
     new 5a19a86  [PERFORMANCE] CassandraMessageDAO should use CassandraAsyncExecutor::executeSingleRow
     new e972901  [PERFORMANCE] CassandraMessageDAOV3 should use CassandraAsyncExecutor::executeSingleRow
     new 496864e  [PERFORMANCE] CassandraRecipientRewriteTableDAO should use CassandraAsyncExecutor::executeRows
     new 9097b9a  [PERFORMANCE] CassandraMailRepositoryKeysDAO should use CassandraAsyncExecutor::executeRows

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/utils/CassandraAsyncExecutor.java    |  3 +-
 .../backends/cassandra/utils/CassandraUtils.java   |  6 ----
 .../cassandra/mail/CassandraDeletedMessageDAO.java | 40 +++++++---------------
 .../cassandra/mail/CassandraMailboxDAO.java        | 17 ++-------
 .../cassandra/mail/CassandraMailboxPathV3DAO.java  | 13 ++-----
 .../cassandra/mail/CassandraMailboxRecentsDAO.java | 15 ++------
 .../cassandra/mail/CassandraMessageDAO.java        | 13 ++-----
 .../cassandra/mail/CassandraMessageDAOV3.java      | 15 +++-----
 .../cassandra/mail/CassandraMessageMapper.java     |  2 +-
 .../mail/CassandraUserMailboxRightsDAO.java        |  8 ++---
 .../cassandra/CassandraMailboxManagerTest.java     |  5 ++-
 .../cassandra/mail/CassandraACLMapperV1Test.java   |  3 +-
 .../cassandra/mail/CassandraACLMapperV2Test.java   |  3 +-
 .../cassandra/mail/CassandraMailboxMapperTest.java |  5 ++-
 .../mail/CassandraMailboxPathV3DAOTest.java        |  2 --
 .../mail/CassandraUserMailboxRightsDAOTest.java    |  3 +-
 .../mail/migration/AclV2MigrationTest.java         |  3 +-
 .../SolveMailboxInconsistenciesServiceTest.java    |  2 --
 .../v7/ElasticSearchQuotaSearcher.java             |  3 +-
 .../CassandraRecipientRewriteTableDAO.java         | 12 +++----
 .../CassandraRecipientRewriteTableDAOTest.java     |  3 +-
 .../CassandraRecipientRewriteTableTest.java        |  3 +-
 .../james/rrt/cassandra/CassandraStepdefs.java     |  3 +-
 .../migration/MappingsSourcesMigrationTest.java    |  3 +-
 .../memory/change/MemoryEmailChangeRepository.java |  6 ++--
 .../cassandra/CassandraMailRepositoryKeysDAO.java  |  8 ++---
 .../CassandraMailRepositoryKeysDAOTest.java        |  3 +-
 .../cassandra/CassandraMailRepositoryTest.java     |  5 ++-
 ...aMailRepositoryWithFakeImplementationsTest.java |  3 +-
 .../org/apache/james/jmap/http/JMAPApiRoutes.java  |  3 +-
 .../routes/CassandraMappingsRoutesTest.java        |  3 +-
 31 files changed, 62 insertions(+), 154 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/07: [PERFORMANCE] CassandraMailRepositoryKeysDAO should use CassandraAsyncExecutor::executeRows

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9097b9abb1c0711738a987208d0b8c6bba43c63f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 14:22:44 2021 +0700

    [PERFORMANCE] CassandraMailRepositoryKeysDAO should use CassandraAsyncExecutor::executeRows
---
 .../mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java     | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
index 897e2a5..7bbc3c9 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
@@ -28,8 +28,6 @@ import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.KEYS
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY;
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME;
 
-import java.util.function.Function;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -86,9 +84,8 @@ public class CassandraMailRepositoryKeysDAO {
     }
 
     public Flux<MailKey> list(MailRepositoryUrl url) {
-        return executor.execute(listKeys.bind()
+        return executor.executeRows(listKeys.bind()
             .setString(REPOSITORY_NAME, url.asString()))
-            .flatMapIterable(Function.identity())
             .map(row -> new MailKey(row.getString(MAIL_KEY)));
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/07: [PERFORMANCE] CassandraMailboxDAO should use CassandraAsyncExecutor::executeRows

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0c8086451000e6b5be7e741e0cfd3cb37f362ae0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 13:53:31 2021 +0700

    [PERFORMANCE] CassandraMailboxDAO should use CassandraAsyncExecutor::executeRows
---
 .../org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java     | 1 -
 1 file changed, 1 deletion(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index f128d67..f799f26 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -56,7 +56,6 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMailboxDAO {
-
     private final CassandraAsyncExecutor executor;
     private final MailboxBaseTupleUtil mailboxBaseTupleUtil;
     private final PreparedStatement readStatement;

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/07: [PERFORMANCE] CassandraMessageDAOV3 should use CassandraAsyncExecutor::executeSingleRow

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e9729018fb6c22e52c04ddbc1e9ed81ff20daa34
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 13:57:45 2021 +0700

    [PERFORMANCE] CassandraMessageDAOV3 should use CassandraAsyncExecutor::executeSingleRow
    
    This prevents a single row ResultSet post-traitements to be scheduled
    on the elasticScheduler
---
 .../mailbox/cassandra/mail/CassandraMessageDAOV3.java     | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index b044020..d4e6c40 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -80,7 +80,6 @@ import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TypeCodec;
@@ -293,23 +292,17 @@ public class CassandraMessageDAOV3 {
 
     public Mono<MessageRepresentation> retrieveMessage(CassandraMessageId cassandraMessageId, FetchType fetchType) {
         return retrieveRow(cassandraMessageId)
-                .flatMap(resultSet -> message(resultSet, cassandraMessageId, fetchType));
+                .flatMap(row -> message(row, cassandraMessageId, fetchType));
     }
 
-    private Mono<ResultSet> retrieveRow(CassandraMessageId messageId) {
-        return cassandraAsyncExecutor.execute(select
+    private Mono<Row> retrieveRow(CassandraMessageId messageId) {
+        return cassandraAsyncExecutor.executeSingleRow(select
             .bind()
             .setUUID(MESSAGE_ID, messageId.get())
             .setConsistencyLevel(consistencyLevel));
     }
 
-    private Mono<MessageRepresentation>
-    message(ResultSet rows, CassandraMessageId cassandraMessageId, FetchType fetchType) {
-        if (rows.isExhausted()) {
-            return Mono.empty();
-        }
-
-        Row row = rows.one();
+    private Mono<MessageRepresentation> message(Row row, CassandraMessageId cassandraMessageId, FetchType fetchType) {
         BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
         BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
         int bodyStartOctet = row.getInt(BODY_START_OCTET);

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/07: [PERFORMANCE] CassandraMessageDAO should use CassandraAsyncExecutor::executeSingleRow

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5a19a860cc91591d3596790eddb8d66a2f3341f7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 13:56:48 2021 +0700

    [PERFORMANCE] CassandraMessageDAO should use CassandraAsyncExecutor::executeSingleRow
    
    This prevents a single row ResultSet post-traitements to be scheduled
    on the elasticScheduler
---
 .../james/mailbox/cassandra/mail/CassandraMessageDAO.java   | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index ed3f8c8..1600985 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -70,7 +70,6 @@ import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.UDTValue;
@@ -245,20 +244,14 @@ public class CassandraMessageDAO {
                 .flatMap(resultSet -> message(resultSet, cassandraMessageId, fetchType));
     }
 
-    private Mono<ResultSet> retrieveRow(CassandraMessageId messageId) {
-        return cassandraAsyncExecutor.execute(select
+    private Mono<Row> retrieveRow(CassandraMessageId messageId) {
+        return cassandraAsyncExecutor.executeSingleRow(select
             .bind()
             .setUUID(MESSAGE_ID, messageId.get())
             .setConsistencyLevel(consistencyLevel));
     }
 
-    private Mono<MessageRepresentation>
-    message(ResultSet rows, CassandraMessageId cassandraMessageId, FetchType fetchType) {
-        if (rows.isExhausted()) {
-            return Mono.empty();
-        }
-
-        Row row = rows.one();
+    private Mono<MessageRepresentation> message(Row row, CassandraMessageId cassandraMessageId, FetchType fetchType) {
         BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
         BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
         int bodyStartOctet = row.getInt(BODY_START_OCTET);

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/07: [PERFORMANCE] CassandraDeletedMessageDAO should use CassandraAsyncExecutor::executeRows

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 814951c5fee19bf94b09f942482494fad3eda95b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 13:52:26 2021 +0700

    [PERFORMANCE] CassandraDeletedMessageDAO should use CassandraAsyncExecutor::executeRows
---
 .../cassandra/mail/CassandraDeletedMessageDAO.java | 23 ++++++++++------------
 1 file changed, 10 insertions(+), 13 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 3e6d9db..69e301b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,8 +30,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
 
-import java.util.function.Function;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,7 +38,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MessageRange;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 
 import reactor.core.publisher.Flux;
@@ -138,11 +136,10 @@ public class CassandraDeletedMessageDAO {
 
     public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) {
         return retrieveResultSetOfDeletedMessage(cassandraId, range)
-            .flatMapIterable(Function.identity())
             .map(row -> MessageUid.of(row.getLong(UID)));
     }
 
-    private Mono<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
+    private Flux<Row> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
         switch (range.getType()) {
             case ALL:
                 return retrieveAllDeleted(cassandraId);
@@ -157,29 +154,29 @@ public class CassandraDeletedMessageDAO {
         throw new UnsupportedOperationException();
     }
 
-    private Mono<ResultSet> retrieveAllDeleted(CassandraId cassandraId) {
-        return cassandraAsyncExecutor.execute(
+    private Flux<Row> retrieveAllDeleted(CassandraId cassandraId) {
+        return cassandraAsyncExecutor.executeRows(
             selectAllUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid()));
     }
 
-    private Mono<ResultSet> retrieveOneDeleted(CassandraId cassandraId, MessageUid uid) {
-        return cassandraAsyncExecutor.execute(
+    private Flux<Row> retrieveOneDeleted(CassandraId cassandraId, MessageUid uid) {
+        return cassandraAsyncExecutor.executeRows(
             selectOneUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID, uid.asLong()));
     }
 
-    private Mono<ResultSet> retrieveDeletedBetween(CassandraId cassandraId, MessageUid from, MessageUid to) {
-        return cassandraAsyncExecutor.execute(
+    private Flux<Row> retrieveDeletedBetween(CassandraId cassandraId, MessageUid from, MessageUid to) {
+        return cassandraAsyncExecutor.executeRows(
             selectBetweenUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID_FROM, from.asLong())
                 .setLong(UID_TO, to.asLong()));
     }
 
-    private Mono<ResultSet> retrieveDeletedAfter(CassandraId cassandraId, MessageUid from) {
-        return cassandraAsyncExecutor.execute(
+    private Flux<Row> retrieveDeletedAfter(CassandraId cassandraId, MessageUid from) {
+        return cassandraAsyncExecutor.executeRows(
             selectFromUidStatement.bind()
                 .setUUID(MAILBOX_ID, cassandraId.asUuid())
                 .setLong(UID_FROM, from.asLong()));

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/07: [PERFORMANCE] CassandraRecipientRewriteTableDAO should use CassandraAsyncExecutor::executeRows

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 496864e5eaba399b94b45555db2183462144e5ed
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 4 14:22:19 2021 +0700

    [PERFORMANCE] CassandraRecipientRewriteTableDAO should use CassandraAsyncExecutor::executeRows
---
 .../rrt/cassandra/CassandraRecipientRewriteTableDAO.java     | 12 ++++--------
 .../rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java |  3 +--
 .../rrt/cassandra/CassandraRecipientRewriteTableTest.java    |  3 +--
 .../org/apache/james/rrt/cassandra/CassandraStepdefs.java    |  3 +--
 .../cassandra/migration/MappingsSourcesMigrationTest.java    |  3 +--
 .../james/webadmin/routes/CassandraMappingsRoutesTest.java   |  3 +--
 6 files changed, 9 insertions(+), 18 deletions(-)

diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
index db31072..e535ae5 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java
@@ -35,7 +35,6 @@ import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;
 import org.apache.james.rrt.lib.MappingsImpl;
@@ -49,16 +48,14 @@ import reactor.core.publisher.Mono;
 
 public class CassandraRecipientRewriteTableDAO {
     private final CassandraAsyncExecutor executor;
-    private final CassandraUtils cassandraUtils;
     private final PreparedStatement insertStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement retrieveMappingStatement;
     private final PreparedStatement retrieveAllMappingsStatement;
 
     @Inject
-    public CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraRecipientRewriteTableDAO(Session session) {
         this.executor = new CassandraAsyncExecutor(session);
-        this.cassandraUtils = cassandraUtils;
         this.insertStatement = prepareInsertStatement(session);
         this.deleteStatement = prepareDelete(session);
         this.retrieveMappingStatement = prepareRetrieveMappingStatement(session);
@@ -107,12 +104,11 @@ public class CassandraRecipientRewriteTableDAO {
     }
 
     Mono<MappingsImpl> retrieveMappings(MappingSource source) {
-        return executor.execute(retrieveMappingStatement.bind()
+        return executor.executeRows(retrieveMappingStatement.bind()
             .setString(USER, source.getFixedUser())
             .setString(DOMAIN, source.getFixedDomain()))
-            .map(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> row.getString(MAPPING))
-                .collect(Guavate.toImmutableList()))
+            .map(row -> row.getString(MAPPING))
+            .collect(Guavate.toImmutableList())
             .map(MappingsImpl::fromCollection)
             .filter(Predicate.not(MappingsImpl::isEmpty));
     }
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
index 1654166..9660964 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java
@@ -24,7 +24,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;
@@ -48,7 +47,7 @@ class CassandraRecipientRewriteTableDAOTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        dao = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        dao = new CassandraRecipientRewriteTableDAO(cassandra.getConf());
     }
 
     @Test
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
index 5e0a19f..29d4696 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableTest.java
@@ -22,7 +22,6 @@ package org.apache.james.rrt.cassandra;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.rrt.lib.AbstractRecipientRewriteTable;
@@ -47,7 +46,7 @@ class CassandraRecipientRewriteTableTest implements RecipientRewriteTableContrac
     @BeforeEach
     void setup(CassandraCluster cassandra) throws Exception {
         cassandraSchemaVersionDAO = new CassandraSchemaVersionDAO(cassandra.getConf());
-        recipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        recipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf());
         mappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
 
         setUp();
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
index 2461dd7..6298ee1 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraStepdefs.java
@@ -21,7 +21,6 @@ package org.apache.james.rrt.cassandra;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.DockerCassandraRule;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.rrt.lib.AbstractRecipientRewriteTable;
 import org.apache.james.rrt.lib.RecipientRewriteTableFixture;
@@ -61,7 +60,7 @@ public class CassandraStepdefs {
 
     private AbstractRecipientRewriteTable getRecipientRewriteTable() throws Exception {
         CassandraRecipientRewriteTable rrt = new CassandraRecipientRewriteTable(
-            new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION),
+            new CassandraRecipientRewriteTableDAO(cassandra.getConf()),
             new CassandraMappingsSourcesDAO(cassandra.getConf()));
         rrt.setDomainList(RecipientRewriteTableFixture.domainListForCucumberTests());
         return rrt;
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
index c66ce36..beb5d19 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java
@@ -31,7 +31,6 @@ import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
 import org.apache.james.rrt.cassandra.CassandraRRTModule;
@@ -67,7 +66,7 @@ class MappingsSourcesMigrationTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf());
         cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
 
         migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
diff --git a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
index 1473173..02a3e83 100644
--- a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
@@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.notNullValue;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.json.DTOConverter;
 import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO;
@@ -72,7 +71,7 @@ class CassandraMappingsRoutesTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf());
         cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf());
         mappingsSourcesMigration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/07: [PERFORMANCE] Migrate where possible from Mono.fatMap to flatMapIterable

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1308e1e5d1a74203193ba34971e1493efbfdd693
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 1 07:47:56 2021 +0700

    [PERFORMANCE] Migrate where possible from Mono.fatMap to flatMapIterable
---
 .../cassandra/utils/CassandraAsyncExecutor.java    |  3 ++-
 .../backends/cassandra/utils/CassandraUtils.java   |  6 ------
 .../cassandra/mail/CassandraDeletedMessageDAO.java | 23 +++++-----------------
 .../cassandra/mail/CassandraMailboxDAO.java        | 16 ++-------------
 .../cassandra/mail/CassandraMailboxPathV3DAO.java  | 13 +++---------
 .../cassandra/mail/CassandraMailboxRecentsDAO.java | 15 ++------------
 .../cassandra/mail/CassandraMessageMapper.java     |  2 +-
 .../mail/CassandraUserMailboxRightsDAO.java        |  8 ++------
 .../cassandra/CassandraMailboxManagerTest.java     |  5 ++---
 .../cassandra/mail/CassandraACLMapperV1Test.java   |  3 +--
 .../cassandra/mail/CassandraACLMapperV2Test.java   |  3 +--
 .../cassandra/mail/CassandraMailboxMapperTest.java |  5 ++---
 .../mail/CassandraMailboxPathV3DAOTest.java        |  2 --
 .../mail/CassandraUserMailboxRightsDAOTest.java    |  3 +--
 .../mail/migration/AclV2MigrationTest.java         |  3 +--
 .../SolveMailboxInconsistenciesServiceTest.java    |  2 --
 .../v7/ElasticSearchQuotaSearcher.java             |  3 ++-
 .../memory/change/MemoryEmailChangeRepository.java |  6 ++++--
 .../cassandra/CassandraMailRepositoryKeysDAO.java  |  9 ++++-----
 .../CassandraMailRepositoryKeysDAOTest.java        |  3 +--
 .../cassandra/CassandraMailRepositoryTest.java     |  5 ++---
 ...aMailRepositoryWithFakeImplementationsTest.java |  3 +--
 .../org/apache/james/jmap/http/JMAPApiRoutes.java  |  3 ++-
 23 files changed, 41 insertions(+), 103 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index aa0c6cf..c869c30 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -22,6 +22,7 @@ package org.apache.james.backends.cassandra.utils;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -67,7 +68,7 @@ public class CassandraAsyncExecutor {
 
     public Flux<Row> executeRows(Statement statement) {
         return execute(statement)
-            .flatMapMany(Flux::fromIterable);
+            .flatMapIterable(Function.identity());
     }
 
     public Mono<Optional<Row>> executeSingleRowOptional(Statement statement) {
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
index e6ec3a9..fa749e6 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraUtils.java
@@ -29,8 +29,6 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 
-import reactor.core.publisher.Flux;
-
 public class CassandraUtils {
 
     public static final CassandraUtils WITH_DEFAULT_CONFIGURATION = new CassandraUtils(CassandraConfiguration.DEFAULT_CONFIGURATION);
@@ -42,10 +40,6 @@ public class CassandraUtils {
         this.cassandraConfiguration = cassandraConfiguration;
     }
 
-    public Flux<Row> convertToFlux(ResultSet resultSet) {
-        return Flux.fromIterable(resultSet);
-    }
-
     public Stream<Row> convertToStream(ResultSet resultSet) {
         return StreamSupport.stream(resultSet.spliterator(), true)
             .peek(row -> ensureFetchedNextPage(resultSet));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 8e45984..3e6d9db 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,10 +30,11 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
 
+import java.util.function.Function;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MessageRange;
@@ -41,7 +42,6 @@ import org.apache.james.mailbox.model.MessageRange;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -54,15 +54,13 @@ public class CassandraDeletedMessageDAO {
     private final PreparedStatement addStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement deleteAllStatement;
-
     private final PreparedStatement selectAllUidStatement;
     private final PreparedStatement selectOneUidStatement;
     private final PreparedStatement selectBetweenUidStatement;
     private final PreparedStatement selectFromUidStatement;
-    private final CassandraUtils cassandraUtils;
 
     @Inject
-    public CassandraDeletedMessageDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraDeletedMessageDAO(Session session) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.addStatement = prepareAddStatement(session);
         this.deleteStatement = prepareDeleteStatement(session);
@@ -71,12 +69,6 @@ public class CassandraDeletedMessageDAO {
         this.selectOneUidStatement = prepareOneUidStatement(session);
         this.selectBetweenUidStatement = prepareBetweenUidStatement(session);
         this.selectFromUidStatement = prepareFromUidStatement(session);
-        this.cassandraUtils = cassandraUtils;
-    }
-
-    @VisibleForTesting
-    public CassandraDeletedMessageDAO(Session session) {
-        this(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
     private PreparedStatement prepareAllUidStatement(Session session) {
@@ -146,7 +138,8 @@ public class CassandraDeletedMessageDAO {
 
     public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) {
         return retrieveResultSetOfDeletedMessage(cassandraId, range)
-            .flatMapMany(this::resultSetToFlux);
+            .flatMapIterable(Function.identity())
+            .map(row -> MessageUid.of(row.getLong(UID)));
     }
 
     private Mono<ResultSet> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
@@ -164,12 +157,6 @@ public class CassandraDeletedMessageDAO {
         throw new UnsupportedOperationException();
     }
 
-    private Flux<MessageUid> resultSetToFlux(ResultSet resultSet) {
-        return cassandraUtils.convertToFlux(resultSet)
-            .map(row ->
-                MessageUid.of(row.getLong(UID)));
-    }
-
     private Mono<ResultSet> retrieveAllDeleted(CassandraId cassandraId) {
         return cassandraAsyncExecutor.execute(
             selectAllUidStatement.bind()
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 21ccd66..f128d67 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -38,7 +38,6 @@ import javax.inject.Inject;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.mail.utils.MailboxBaseTupleUtil;
@@ -52,7 +51,6 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -61,7 +59,6 @@ public class CassandraMailboxDAO {
 
     private final CassandraAsyncExecutor executor;
     private final MailboxBaseTupleUtil mailboxBaseTupleUtil;
-    private final CassandraUtils cassandraUtils;
     private final PreparedStatement readStatement;
     private final PreparedStatement listStatement;
     private final PreparedStatement deleteStatement;
@@ -72,8 +69,7 @@ public class CassandraMailboxDAO {
 
     @Inject
     public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider,
-                               CassandraConsistenciesConfiguration consistenciesConfiguration,
-                               CassandraUtils cassandraUtils) {
+                               CassandraConsistenciesConfiguration consistenciesConfiguration) {
         this.executor = new CassandraAsyncExecutor(session);
         this.consistencyLevel = consistenciesConfiguration.getRegular();
         this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider);
@@ -83,13 +79,6 @@ public class CassandraMailboxDAO {
         this.deleteStatement = prepareDelete(session);
         this.listStatement = prepareList(session);
         this.readStatement = prepareRead(session);
-        this.cassandraUtils = cassandraUtils;
-    }
-
-    @VisibleForTesting
-    public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider,
-                               CassandraConsistenciesConfiguration consistenciesConfiguration) {
-        this(session, typesProvider, consistenciesConfiguration, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
     private PreparedStatement prepareInsert(Session session) {
@@ -188,8 +177,7 @@ public class CassandraMailboxDAO {
     }
 
     public Flux<Mailbox> retrieveAllMailboxes() {
-        return executor.execute(listStatement.bind())
-            .flatMapMany(cassandraUtils::convertToFlux)
+        return executor.executeRows(listStatement.bind())
             .flatMap(this::toMailboxWithId, DEFAULT_CONCURRENCY);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
index 3de61be..d8435d6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
@@ -37,7 +37,6 @@ import javax.inject.Inject;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.GhostMailbox;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -58,7 +57,6 @@ import reactor.core.publisher.Mono;
 
 public class CassandraMailboxPathV3DAO {
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
-    private final CassandraUtils cassandraUtils;
     private final PreparedStatement delete;
     private final PreparedStatement insert;
     private final PreparedStatement select;
@@ -67,11 +65,9 @@ public class CassandraMailboxPathV3DAO {
     private final CassandraConsistenciesConfiguration consistenciesConfiguration;
 
     @Inject
-    public CassandraMailboxPathV3DAO(Session session, CassandraUtils cassandraUtils,
-                                     CassandraConsistenciesConfiguration consistenciesConfiguration) {
+    public CassandraMailboxPathV3DAO(Session session, CassandraConsistenciesConfiguration consistenciesConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.consistenciesConfiguration = consistenciesConfiguration;
-        this.cassandraUtils = cassandraUtils;
         this.insert = prepareInsert(session);
         this.delete = prepareDelete(session);
         this.select = prepareSelect(session);
@@ -139,20 +135,17 @@ public class CassandraMailboxPathV3DAO {
     }
 
     public Flux<Mailbox> listUserMailboxes(String namespace, Username user, ConsistencyChoice consistencyChoice) {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
             selectUser.bind()
                 .setString(NAMESPACE, namespace)
                 .setString(USER, sanitizeUser(user))
                 .setConsistencyLevel(consistencyChoice.choose(consistenciesConfiguration)))
-            .flatMapMany(cassandraUtils::convertToFlux)
             .map(this::fromRowToCassandraIdAndPath)
             .map(FunctionalUtils.toFunction(this::logReadSuccess));
     }
 
     public Flux<Mailbox> listAll() {
-        return cassandraAsyncExecutor.execute(
-            selectAll.bind())
-            .flatMapMany(cassandraUtils::convertToFlux)
+        return cassandraAsyncExecutor.executeRows(selectAll.bind())
             .map(this::fromRowToCassandraIdAndPath)
             .map(FunctionalUtils.toFunction(this::logReadSuccess));
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index ed96abd..d7d59bf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -27,7 +27,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
@@ -36,33 +35,24 @@ import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMailboxRecentsDAO {
-
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement readStatement;
     private final PreparedStatement deleteStatement;
     private final PreparedStatement deleteAllStatement;
     private final PreparedStatement addStatement;
-    private CassandraUtils cassandraUtils;
 
     @Inject
-    public CassandraMailboxRecentsDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraMailboxRecentsDAO(Session session) {
         cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         readStatement = createReadStatement(session);
         deleteStatement = createDeleteStatement(session);
         deleteAllStatement = createDeleteAllStatement(session);
         addStatement = createAddStatement(session);
-        this.cassandraUtils = cassandraUtils;
-    }
-
-    @VisibleForTesting
-    public CassandraMailboxRecentsDAO(Session session) {
-        this(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
     private PreparedStatement createReadStatement(Session session) {
@@ -95,8 +85,7 @@ public class CassandraMailboxRecentsDAO {
     }
 
     public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, readStatement))
-            .flatMapMany(cassandraUtils::convertToFlux)
+        return cassandraAsyncExecutor.executeRows(bindWithMailbox(mailboxId, readStatement))
             .map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))
             .map(MessageUid::of);
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 2a1f903..7bf43d1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -394,7 +394,7 @@ public class CassandraMessageMapper implements MessageMapper {
 
         Flux<ComposedMessageIdWithMetaData> toBeUpdated = mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
             .collectList()
-            .flatMapMany(uids -> Flux.fromIterable(MessageRange.toRanges(uids)))
+            .flatMapIterable(MessageRange::toRanges)
             .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()))
             .filter(message -> message.getFlags().contains(Flag.RECENT));
         FlagsUpdateCalculator calculator = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index 33d6ec1..1c3028d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -35,7 +35,6 @@ import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.acl.PositiveUserACLDiff;
@@ -56,16 +55,14 @@ import reactor.core.publisher.Mono;
 public class CassandraUserMailboxRightsDAO {
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
-    private final CassandraUtils cassandraUtils;
     private final PreparedStatement delete;
     private final PreparedStatement insert;
     private final PreparedStatement select;
     private final PreparedStatement selectUser;
 
     @Inject
-    public CassandraUserMailboxRightsDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraUserMailboxRightsDAO(Session session) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.cassandraUtils = cassandraUtils;
         this.delete = prepareDelete(session);
         this.insert = prepareInsert(session);
         this.select = prepareSelect(session);
@@ -137,10 +134,9 @@ public class CassandraUserMailboxRightsDAO {
     }
 
     public Flux<Pair<CassandraId, Rfc4314Rights>> listRightsForUser(Username userName) {
-        return cassandraAsyncExecutor.execute(
+        return cassandraAsyncExecutor.executeRows(
             selectUser.bind()
                 .setString(USER_NAME, userName.asString()))
-            .flatMapMany(cassandraUtils::convertToFlux)
             .map(Throwing.function(this::toPair));
     }
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 0acbb0c..41f29b9 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -33,7 +33,6 @@ import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.blob.api.BlobStore;
@@ -805,7 +804,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai
             JsonEventSerializer jsonEventSerializer = JsonEventSerializer
                 .forModules(ACLModule.ACL_UPDATE)
                 .withoutNestedType();
-            CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandraCluster.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+            CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandraCluster.getConf());
             CassandraEventStore eventStore = new CassandraEventStore(new EventStoreDao(cassandraCluster.getConf(), jsonEventSerializer, CassandraConsistenciesConfiguration.DEFAULT));
             return new CassandraACLMapper(
                 new CassandraACLMapper.StoreV1(usersRightDAO, aclDAOV1),
@@ -814,7 +813,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai
         }
 
         private CassandraUserMailboxRightsDAO rightsDAO(CassandraCluster cassandraCluster) {
-            return new CassandraUserMailboxRightsDAO(cassandraCluster.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+            return new CassandraUserMailboxRightsDAO(cassandraCluster.getConf());
         }
 
         private CassandraAttachmentMessageIdDAO attachmentMessageIdDAO(CassandraCluster cassandraCluster) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV1Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV1Test.java
index e34e0b0..75f54ce 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV1Test.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV1Test.java
@@ -34,7 +34,6 @@ import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -69,7 +68,7 @@ class CassandraACLMapperV1Test extends CassandraACLMapperContract {
         JsonEventSerializer jsonEventSerializer = JsonEventSerializer
             .forModules(ACLModule.ACL_UPDATE)
             .withoutNestedType();
-        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf());
         CassandraEventStore eventStore = new CassandraEventStore(new EventStoreDao(cassandra.getConf(), jsonEventSerializer, CassandraConsistenciesConfiguration.DEFAULT));
         cassandraACLMapper = new CassandraACLMapper(
             new CassandraACLMapper.StoreV1(usersRightDAO, aclDAOV1),
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV2Test.java
index c91319a..5beaede 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV2Test.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperV2Test.java
@@ -33,7 +33,6 @@ import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -67,7 +66,7 @@ class CassandraACLMapperV2Test extends CassandraACLMapperContract {
         JsonEventSerializer jsonEventSerializer = JsonEventSerializer
             .forModules(ACLModule.ACL_UPDATE)
             .withoutNestedType();
-        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf());
         CassandraEventStore eventStore = new CassandraEventStore(new EventStoreDao(cassandra.getConf(), jsonEventSerializer, CassandraConsistenciesConfiguration.DEFAULT));
         cassandraACLMapper = new CassandraACLMapper(
             new CassandraACLMapper.StoreV1(usersRightDAO, aclDAOV1),
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 409132a..d4f377c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -33,7 +33,6 @@ import org.apache.james.backends.cassandra.Scenario;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -103,7 +102,7 @@ class CassandraMailboxMapperTest {
     void setUp() {
         CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
         mailboxDAO = new CassandraMailboxDAO(cassandra.getConf(), cassandra.getTypesProvider(), cassandraCluster.getCassandraConsistenciesConfiguration());
-        mailboxPathV3DAO = new CassandraMailboxPathV3DAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION, cassandraCluster.getCassandraConsistenciesConfiguration());
+        mailboxPathV3DAO = new CassandraMailboxPathV3DAO(cassandra.getConf(), cassandraCluster.getCassandraConsistenciesConfiguration());
 
         versionDAO = new CassandraSchemaVersionDAO(cassandra.getConf());
         versionDAO.truncateVersion()
@@ -121,7 +120,7 @@ class CassandraMailboxMapperTest {
         JsonEventSerializer jsonEventSerializer = JsonEventSerializer
                 .forModules(ACLModule.ACL_UPDATE)
                 .withoutNestedType();
-        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf());
         CassandraEventStore eventStore = new CassandraEventStore(new EventStoreDao(cassandra.getConf(), jsonEventSerializer, CassandraConsistenciesConfiguration.DEFAULT));
         CassandraACLMapper aclMapper = new CassandraACLMapper(
                 new CassandraACLMapper.StoreV1(usersRightDAO, aclDAOV1),
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
index a59c759..2085743 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraMailboxModule;
 import org.apache.james.mailbox.model.Mailbox;
@@ -52,7 +51,6 @@ class CassandraMailboxPathV3DAOTest {
     void setUp(CassandraCluster cassandra) {
         testee = new CassandraMailboxPathV3DAO(
             cassandra.getConf(),
-            CassandraUtils.WITH_DEFAULT_CONFIGURATION,
             cassandraCluster.getCassandraConsistenciesConfiguration());
     }
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
index 5e9c8b0..aa85c2c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -50,7 +49,7 @@ class CassandraUserMailboxRightsDAOTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        testee = new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        testee = new CassandraUserMailboxRightsDAO(cassandra.getConf());
     }
 
     @Test
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AclV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AclV2MigrationTest.java
index 1a22a87..896f5ee 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AclV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AclV2MigrationTest.java
@@ -26,7 +26,6 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
@@ -82,7 +81,7 @@ class AclV2MigrationTest {
             .forModules(ACLModule.ACL_UPDATE)
             .withoutNestedType();
         CassandraEventStore eventStore = new CassandraEventStore(new EventStoreDao(cassandra.getConf(), jsonEventSerializer, CassandraConsistenciesConfiguration.DEFAULT));
-        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        CassandraUserMailboxRightsDAO usersRightDAO = new CassandraUserMailboxRightsDAO(cassandra.getConf());
         migration = new AclV2Migration(mailboxDAO,
             new CassandraACLMapper.StoreV1(usersRightDAO, daoV1),
             new CassandraACLMapper.StoreV2(usersRightDAO, daoV2, eventStore));
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
index 4fc7d76..91d8312 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
@@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
@@ -80,7 +79,6 @@ class SolveMailboxInconsistenciesServiceTest {
             cassandraCluster.getCassandraConsistenciesConfiguration());
         mailboxPathV3DAO = new CassandraMailboxPathV3DAO(
             cassandra.getConf(),
-            CassandraUtils.WITH_DEFAULT_CONFIGURATION,
             cassandraCluster.getCassandraConsistenciesConfiguration());
         versionDAO = new CassandraSchemaVersionDAO(cassandra.getConf());
         testee = new SolveMailboxInconsistenciesService(mailboxDAO, mailboxPathV3DAO, new CassandraSchemaVersionManager(versionDAO));
diff --git a/mailbox/plugin/quota-search-elasticsearch-v7/src/main/java/org/apache/james/quota/search/elasticsearch/v7/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch-v7/src/main/java/org/apache/james/quota/search/elasticsearch/v7/ElasticSearchQuotaSearcher.java
index 6b54d02..113881f 100644
--- a/mailbox/plugin/quota-search-elasticsearch-v7/src/main/java/org/apache/james/quota/search/elasticsearch/v7/ElasticSearchQuotaSearcher.java
+++ b/mailbox/plugin/quota-search-elasticsearch-v7/src/main/java/org/apache/james/quota/search/elasticsearch/v7/ElasticSearchQuotaSearcher.java
@@ -39,6 +39,7 @@ import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 
 import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 
@@ -86,7 +87,7 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
             .source(searchSourceBuilder);
 
         return client.search(searchRequest, RequestOptions.DEFAULT)
-            .flatMapMany(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits()));
+            .flatMapIterable(searchResponse -> ImmutableList.copyOf(searchResponse.getHits().getHits()));
     }
 
     private Flux<SearchHit> executeScrolledSearch(QuotaQuery query) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/change/MemoryEmailChangeRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/change/MemoryEmailChangeRepository.java
index 9636693..5296780 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/change/MemoryEmailChangeRepository.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/change/MemoryEmailChangeRepository.java
@@ -34,6 +34,7 @@ import org.apache.james.jmap.api.change.State;
 import org.apache.james.jmap.api.exception.ChangeNotFoundException;
 import org.apache.james.jmap.api.model.AccountId;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
@@ -107,9 +108,10 @@ public class MemoryEmailChangeRepository implements EmailChangeRepository {
 
     private Flux<EmailChange> allChangesSince(AccountId accountId, State state) {
         return findByState(accountId, state)
-            .flatMapMany(currentState -> Flux.fromIterable(emailChangeMap.get(accountId))
+            .flatMapIterable(currentState -> emailChangeMap.get(accountId).stream()
                 .filter(change -> change.getDate().isAfter(currentState.getDate()))
-                .sort(Comparator.comparing(EmailChange::getDate)));
+                .sorted(Comparator.comparing(EmailChange::getDate))
+                .collect(Guavate.toImmutableList()));
     }
 
     private Flux<EmailChange> allChanges(AccountId accountId) {
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
index 307c8e9..897e2a5 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
@@ -28,10 +28,11 @@ import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.KEYS
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY;
 import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME;
 
+import java.util.function.Function;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
 
@@ -44,15 +45,13 @@ import reactor.core.publisher.Mono;
 public class CassandraMailRepositoryKeysDAO {
 
     private final CassandraAsyncExecutor executor;
-    private final CassandraUtils cassandraUtils;
     private final PreparedStatement insertKey;
     private final PreparedStatement deleteKey;
     private final PreparedStatement listKeys;
 
     @Inject
-    public CassandraMailRepositoryKeysDAO(Session session, CassandraUtils cassandraUtils) {
+    public CassandraMailRepositoryKeysDAO(Session session) {
         this.executor = new CassandraAsyncExecutor(session);
-        this.cassandraUtils = cassandraUtils;
 
         this.insertKey = prepareInsert(session);
         this.deleteKey = prepareDelete(session);
@@ -89,7 +88,7 @@ public class CassandraMailRepositoryKeysDAO {
     public Flux<MailKey> list(MailRepositoryUrl url) {
         return executor.execute(listKeys.bind()
             .setString(REPOSITORY_NAME, url.asString()))
-            .flatMapMany(cassandraUtils::convertToFlux)
+            .flatMapIterable(Function.identity())
             .map(row -> new MailKey(row.getString(MAIL_KEY)));
     }
 
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
index db7f98a..0a2d568 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
@@ -24,7 +24,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
@@ -48,7 +47,7 @@ class CassandraMailRepositoryKeysDAOTest {
 
     @BeforeEach
     void setUp(CassandraCluster cassandra) {
-        testee = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        testee = new CassandraMailRepositoryKeysDAO(cassandra.getConf());
     }
 
     @Test
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index 3f1d9b0..b0642e8 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
@@ -65,7 +64,7 @@ class CassandraMailRepositoryTest {
             CassandraMailRepositoryMailDAO v1 = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
             CassandraMailRepositoryMailDaoV2 v2 = new CassandraMailRepositoryMailDaoV2(cassandra.getConf(), BLOB_ID_FACTORY);
             CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2);
-            CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+            CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf());
             CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
             BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                 .passthrough();
@@ -105,7 +104,7 @@ class CassandraMailRepositoryTest {
             CassandraMailRepositoryMailDAO v1 = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
             CassandraMailRepositoryMailDaoV2 v2 = new CassandraMailRepositoryMailDaoV2(cassandra.getConf(), BLOB_ID_FACTORY);
             CassandraMailRepositoryMailDaoAPI mailDAO = new MergingCassandraMailRepositoryMailDao(v1, v2);
-            CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+            CassandraMailRepositoryKeysDAO keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf());
             CassandraMailRepositoryCountDAO countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
             BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
                 .deduplication();
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index 73b24e0..01b81da 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
@@ -65,7 +64,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
     @BeforeEach
     void setup(CassandraCluster cassandra) {
         CassandraMailRepositoryMailDaoAPI mailDAO = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
-        keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION);
+        keysDAO = new CassandraMailRepositoryKeysDAO(cassandra.getConf());
         countDAO = new CassandraMailRepositoryCountDAO(cassandra.getConf());
         BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
             .passthrough();
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
index 4f2f110..1882121 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/JMAPApiRoutes.java
@@ -51,6 +51,7 @@ import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 
 import io.netty.handler.codec.http.HttpMethod;
 import reactor.core.publisher.Flux;
@@ -141,6 +142,6 @@ public class JMAPApiRoutes implements JMAPRoutes {
                     throw new BadRequestException("Error deserializing JSON", e);
                 }
             })
-            .flatMapMany(Flux::fromArray);
+            .flatMapIterable(ImmutableList::copyOf);
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org