You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/06 08:05:38 UTC

[1/3] james-project git commit: JAMES-2619 Make sure to name all Executors and Threads

Repository: james-project
Updated Branches:
  refs/heads/master 16e4d9f93 -> f121dd8d4


JAMES-2619 Make sure to name all Executors and Threads


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e96b5cbb
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e96b5cbb
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e96b5cbb

Branch: refs/heads/master
Commit: e96b5cbb38ebdbe03b1733c8cb835609591cca1e
Parents: b0ebfb6
Author: Matthieu Baechler <ma...@apache.org>
Authored: Tue Dec 4 18:21:36 2018 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:03:57 2018 +0700

----------------------------------------------------------------------
 .../cassandra/migration/CassandraMigrationServiceTest.java   | 4 +++-
 .../apache/james/backends/es/ElasticSearchIndexerTest.java   | 6 +++++-
 .../backend/rabbitmq/RabbitMQConnectionFactoryTest.java      | 4 +++-
 .../org/apache/james/backend/rabbitmq/RabbitMQExtension.java | 5 ++++-
 .../org/apache/james/mailbox/MailboxManagerStressTest.java   | 5 ++++-
 .../james/mailbox/cassandra/mail/CassandraACLMapperTest.java | 5 ++++-
 .../mailbox/elasticsearch/ElasticSearchIntegrationTest.java  | 6 ++++--
 .../ElasticSearchQuotaSearchTestSystemExtension.java         | 5 ++++-
 .../events/ElasticSearchQuotaMailboxListenerTest.java        | 5 ++++-
 .../james/mailbox/store/event/AsynchronousEventDelivery.java | 5 ++++-
 .../elasticsearch/host/ElasticSearchHostSystem.java          | 5 ++++-
 .../java/org/apache/james/imap/processor/IdleProcessor.java  | 5 ++++-
 .../james/imap/processor/base/SelectedMailboxImplTest.java   | 5 ++++-
 .../apache/james/protocols/netty/AbstractAsyncServer.java    | 8 ++++++--
 .../modules/mailbox/ScheduledExecutorServiceProvider.java    | 6 +++++-
 .../org/apache/james/JamesServerWithRetryConnectionTest.java | 5 ++++-
 .../james/modules/server/AsyncTasksExecutorModule.java       | 6 +++++-
 .../apache/james/util/concurrency/ConcurrentTestRunner.java  | 5 ++++-
 .../org/apache/james/util/CompletableFutureUtilTest.java     | 5 ++++-
 .../org/apache/james/util/retry/RetryExecutorUtilTest.java   | 5 ++++-
 .../org/apache/james/transport/mailets/RemoteDelivery.java   | 5 ++++-
 .../webadmin/routes/ElasticSearchQuotaSearchExtension.java   | 5 ++++-
 .../main/java/org/apache/james/queue/file/FileMailQueue.java | 3 ++-
 .../main/java/org/apache/james/queue/rabbitmq/Dequeuer.java  | 5 ++++-
 .../main/java/org/apache/james/task/MemoryTaskManager.java   | 5 ++++-
 .../main/java/org/apache/james/junit/ExecutorExtension.java  | 5 ++++-
 .../apache/james/spamassassin/mock/MockSpamdTestRule.java    | 3 ++-
 27 files changed, 107 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
index 7730e80..7631f4b 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.task.Task;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -74,7 +75,8 @@ public class CassandraMigrationServiceTest {
             .put(LATEST_VERSION, successfulMigration)
             .build();
         testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
-        executorService = Executors.newFixedThreadPool(2);
+        executorService = Executors.newFixedThreadPool(2,
+            NamedThreadFactory.withClassName(getClass()));
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
----------------------------------------------------------------------
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index fa51c8f..0654a78 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -24,8 +24,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.backends.es.utils.TestingClientProvider;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -61,7 +63,9 @@ public class ElasticSearchIndexerTest {
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
             .createIndexAndAliases(clientProvider.get());
-        testee = new ElasticSearchIndexer(clientProvider.get(), Executors.newSingleThreadExecutor(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+        testee = new ElasticSearchIndexer(clientProvider.get(),
+            Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
+            ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
index 489f557f..87d9904 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
@@ -24,7 +24,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.net.URI;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -36,7 +38,7 @@ class RabbitMQConnectionFactoryTest {
 
     @BeforeEach
     void setUp() throws Exception {
-        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index acf1a2c..d6d2ead 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -22,7 +22,9 @@ import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEME
 
 import java.net.URISyntaxException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -91,8 +93,9 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
             .minDelay(ONE_HUNDRED_MILLISECONDS)
             .build();
 
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
         return new RabbitMQConnectionFactory(
             rabbitMQConfiguration,
-            new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+            new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
index 9f13dec..21dbb71 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -36,6 +37,7 @@ import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mime4j.dom.Message;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
@@ -55,9 +57,10 @@ public abstract class MailboxManagerStressTest {
 
     @Test
     public void testStressTest() throws InterruptedException, MailboxException {
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
 
         final CountDownLatch latch = new CountDownLatch(APPEND_OPERATIONS);
-        final ExecutorService pool = Executors.newFixedThreadPool(APPEND_OPERATIONS / 20);
+        final ExecutorService pool = Executors.newFixedThreadPool(APPEND_OPERATIONS / 20, threadFactory);
         final Collection<MessageUid> uList = new ConcurrentLinkedDeque<>();
         final String username = "username";
         MailboxSession session = mailboxManager.createSystemSession(username);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
index cdea6fa..d9f7c89 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -40,6 +41,7 @@ import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
 import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxACL;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,7 +61,8 @@ class CassandraACLMapperTest {
     void setUp(CassandraCluster cassandra) {
         cassandraACLMapper = GuiceUtils.testInjector(cassandra)
             .getInstance(CassandraACLMapper.class);
-        executor = Executors.newFixedThreadPool(2);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executor = Executors.newFixedThreadPool(2, threadFactory);
     }
 
     @AfterEach

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 666177d..4dc236e 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.nio.charset.StandardCharsets;
 import java.time.ZoneId;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -50,6 +51,7 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
 import org.apache.james.mailbox.tika.TikaTextExtractor;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.mime4j.dom.Message;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.client.Client;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -99,11 +101,11 @@ public class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest
         storeMailboxManager = new InMemoryIntegrationResources()
             .createMailboxManager(new SimpleGroupMembershipResolver());
 
-
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
         ElasticSearchListeningMessageSearchIndex elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex(
             storeMailboxManager.getMapperFactory(),
             new ElasticSearchIndexer(client,
-                Executors.newSingleThreadExecutor(),
+                Executors.newSingleThreadExecutor(threadFactory),
                 MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
                 MailboxElasticSearchConstants.MESSAGE_TYPE,
                 BATCH_SIZE),

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
index 2d511cf..872cf33 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
@@ -22,6 +22,7 @@ package org.apache.james.quota.search.elasticsearch;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -36,6 +37,7 @@ import org.apache.james.quota.search.QuotaSearchTestSystem;
 import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
 import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.client.Client;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -69,8 +71,9 @@ public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterRes
             MemoryDomainList domainList = new MemoryDomainList(dnsService);
             usersRepository.setDomainList(domainList);
 
+            ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
             ElasticSearchQuotaMailboxListener listener = new ElasticSearchQuotaMailboxListener(
-                new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(),
+                new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(threadFactory),
                     QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
                     QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE),
                 new QuotaRatioToElasticSearchJson());

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
index 4ea8e30..dabec13 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
@@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -39,6 +40,7 @@ import org.apache.james.mailbox.quota.QuotaFixture.Sizes;
 import org.apache.james.quota.search.elasticsearch.QuotaRatioElasticSearchConstants;
 import org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.junit.Before;
@@ -66,9 +68,10 @@ public class ElasticSearchQuotaMailboxListenerTest {
         client = QuotaSearchIndexCreationUtil.prepareDefaultClient(
             new TestingClientProvider(embeddedElasticSearch.getNode()).get(), ElasticSearchConfiguration.DEFAULT_CONFIGURATION);
 
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
         quotaMailboxListener = new ElasticSearchQuotaMailboxListener(
             new ElasticSearchIndexer(client,
-                Executors.newSingleThreadExecutor(),
+                Executors.newSingleThreadExecutor(threadFactory),
                 QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
                 QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE,
                 BATCH_SIZE),

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
index 9ca9824..593de54 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
@@ -21,11 +21,13 @@ package org.apache.james.mailbox.store.event;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import javax.annotation.PreDestroy;
 
 import org.apache.james.mailbox.Event;
 import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 
 public class AsynchronousEventDelivery implements EventDelivery {
 
@@ -33,7 +35,8 @@ public class AsynchronousEventDelivery implements EventDelivery {
     private final SynchronousEventDelivery synchronousEventDelivery;
 
     public AsynchronousEventDelivery(int threadPoolSize, SynchronousEventDelivery synchronousEventDelivery) {
-        this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
         this.synchronousEventDelivery = synchronousEventDelivery;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
----------------------------------------------------------------------
diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
index 2404f25..4186ee9 100644
--- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
+++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
@@ -23,6 +23,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.ZoneId;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.NotImplementedException;
@@ -61,6 +62,7 @@ import org.apache.james.metrics.logger.DefaultMetricFactory;
 import org.apache.james.mpt.api.ImapFeatures;
 import org.apache.james.mpt.api.ImapFeatures.Feature;
 import org.apache.james.mpt.host.JamesImapHostSystem;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.client.Client;
 
 public class ElasticSearchHostSystem extends JamesImapHostSystem {
@@ -96,10 +98,11 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem {
         InMemoryMailboxSessionMapperFactory factory = new InMemoryMailboxSessionMapperFactory();
         InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
 
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
         ElasticSearchListeningMessageSearchIndex searchIndex = new ElasticSearchListeningMessageSearchIndex(
             factory,
             new ElasticSearchIndexer(client,
-                Executors.newSingleThreadExecutor(),
+                Executors.newSingleThreadExecutor(threadFactory),
                 MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
                 MailboxElasticSearchConstants.MESSAGE_TYPE),
             new ElasticSearchSearcher(client,

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
----------------------------------------------------------------------
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 0018458..7700d28 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -49,6 +50,7 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,7 +80,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
         this.heartbeatIntervalUnit = imapConfiguration.getIdleTimeIntervalUnit();
         this.enableIdle = imapConfiguration.isEnableIdle();
         if (enableIdle) {
-            this.heartbeatExecutor = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_POOL_CORE_SIZE);
+            ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+            this.heartbeatExecutor = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_POOL_CORE_SIZE, threadFactory);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
----------------------------------------------------------------------
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index e923b2a..ef8d5b9 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.mail.Flags;
@@ -51,6 +52,7 @@ import org.apache.james.mailbox.store.SimpleMessageMetaData;
 import org.apache.james.mailbox.store.event.EventFactory;
 import org.apache.james.mailbox.store.mail.model.DefaultMessageId;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -79,7 +81,8 @@ public class SelectedMailboxImplTest {
 
     @Before
     public void setUp() throws Exception {
-        executorService = Executors.newFixedThreadPool(1);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executorService = Executors.newFixedThreadPool(1, threadFactory);
         mailboxPath = MailboxPath.forUser("tellier@linagora.com", MailboxConstants.INBOX);
         mailboxManager = mock(MailboxManager.class);
         messageManager = mock(MessageManager.class);

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
----------------------------------------------------------------------
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
index faddde6..ef8554d 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.protocols.api.ProtocolServer;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -203,7 +205,8 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
      * @return bossExecutor
      */
     protected Executor createBossExecutor() {
-        return Executors.newCachedThreadPool();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        return Executors.newCachedThreadPool(threadFactory);
     }
 
     /**
@@ -212,7 +215,8 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
      * @return workerExecutor
      */
     protected Executor createWorkerExecutor() {
-        return Executors.newCachedThreadPool();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        return Executors.newCachedThreadPool(threadFactory);
     }
     
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
index 48fe3f1..56ec923 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
@@ -21,9 +21,12 @@ package org.apache.james.modules.mailbox;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 import javax.annotation.PreDestroy;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Provider;
 
@@ -34,7 +37,8 @@ class ScheduledExecutorServiceProvider implements Provider<ScheduledExecutorServ
 
     @VisibleForTesting
     ScheduledExecutorServiceProvider() {
-        scheduler = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
index 5b5b33f..69954af 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
@@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
@@ -37,6 +38,7 @@ import org.apache.james.mailbox.store.search.PDFTextExtractor;
 import org.apache.james.modules.TestJMAPServerModule;
 import org.apache.james.modules.protocols.ImapGuiceProbe;
 import org.apache.james.util.Host;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.james.util.docker.Images;
 import org.apache.james.util.docker.SwarmGenericContainer;
 import org.junit.jupiter.api.AfterEach;
@@ -104,7 +106,8 @@ class JamesServerWithRetryConnectionTest {
 
     @BeforeEach
     void setUp() throws IOException {
-        executorService = Executors.newFixedThreadPool(1);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executorService = Executors.newFixedThreadPool(1, threadFactory);
         socketChannel = SocketChannel.open();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
index b8e42fe..fc30b12 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
@@ -20,9 +20,12 @@ package org.apache.james.modules.server;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import javax.annotation.PreDestroy;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
+
 import com.google.inject.AbstractModule;
 import com.google.inject.Provider;
 import com.google.inject.name.Names;
@@ -33,9 +36,10 @@ public class AsyncTasksExecutorModule extends AbstractModule {
 
     @Override
     protected void configure() {
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
         bind(ExecutorService.class).annotatedWith(Names.named("AsyncExecutor"))
             .toProvider(new LifecycleAwareExecutorServiceProvider(
-                Executors.newFixedThreadPool(THREAD_POOL_SIZE)));
+                Executors.newFixedThreadPool(THREAD_POOL_SIZE, threadFactory)));
     }
 
     public static class LifecycleAwareExecutorServiceProvider implements Provider<ExecutorService> {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index 8411d40..eec20c0 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -28,8 +28,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,7 +143,8 @@ public class ConcurrentTestRunner {
         this.operationCount = operationCount;
         this.countDownLatch = new CountDownLatch(threadCount);
         this.biConsumer = biConsumer;
-        this.executorService = Executors.newFixedThreadPool(threadCount);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        this.executorService = Executors.newFixedThreadPool(threadCount, threadFactory);
         this.futures = new ArrayList<>();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index bf136ca..cdfce5e 100644
--- a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -27,11 +27,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,7 +46,8 @@ public class CompletableFutureUtilTest {
 
     @Before
     public void setUp() {
-        executorService = Executors.newFixedThreadPool(4);
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executorService = Executors.newFixedThreadPool(4, threadFactory);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
index 81c2f11..28307fb 100644
--- a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,7 +51,8 @@ public class RetryExecutorUtilTest {
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
-        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 48bcf65..4e5d68f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.inject.Inject;
@@ -42,6 +43,7 @@ import org.apache.james.transport.mailets.remote.delivery.Bouncer;
 import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
 import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
 import org.apache.james.transport.mailets.remote.delivery.RemoteDeliverySocketFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
@@ -171,7 +173,8 @@ public class RemoteDelivery extends GenericMailet {
     }
 
     private void initDeliveryThreads() {
-        executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount());
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount(), threadFactory);
         for (int a = 0; a < configuration.getWorkersThreadCount(); a++) {
             executor.execute(
                 new DeliveryRunnable(queue,

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
index 2eddc4c..2e9c409 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
@@ -22,6 +22,7 @@ package org.apache.james.webadmin.routes;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -39,6 +40,7 @@ import org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil;
 import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
 import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.client.Client;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -72,8 +74,9 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef
             MemoryDomainList domainList = new MemoryDomainList(dnsService);
             usersRepository.setDomainList(domainList);
 
+            ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
             ElasticSearchQuotaMailboxListener listener = new ElasticSearchQuotaMailboxListener(
-                new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(),
+                new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(threadFactory),
                     QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
                     QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE),
                 new QuotaRatioToElasticSearchJson());

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
index cc82148..a4d851a 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
@@ -53,6 +53,7 @@ import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.server.core.MimeMessageSource;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public class FileMailQueue implements ManageableMailQueue {
 
     private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>());
     private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>();
-    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
     private static final AtomicLong COUNTER = new AtomicLong();
     private final String queueDirName;
     private final File queueDir;

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 4354a50..6caef63 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -24,6 +24,7 @@ import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -32,6 +33,7 @@ import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.Throwing;
@@ -125,7 +127,8 @@ class Dequeuer {
     }
 
     private CompletableFuture<GetResponse> pollChannel() {
-        return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory))
             .withFixedRate()
             .withMinDelay(TEN_MS)
             .retryOn(NoMailYetException.class)

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index e0cbcc2..e26e173 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.function.Consumer;
 
 import javax.annotation.PreDestroy;
 
 import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +51,8 @@ public class MemoryTaskManager implements TaskManager {
     public MemoryTaskManager() {
         idToExecutionDetails = new ConcurrentHashMap<>();
         idToFuture = new ConcurrentHashMap<>();
-        executor = Executors.newSingleThreadExecutor();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executor = Executors.newSingleThreadExecutor(threadFactory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
----------------------------------------------------------------------
diff --git a/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java b/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
index c9b9545..53b87f3 100644
--- a/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
+++ b/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
@@ -21,7 +21,9 @@ package org.apache.james.junit;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -35,7 +37,8 @@ public class ExecutorExtension implements ParameterResolver, BeforeEachCallback,
 
     @Override
     public void beforeEach(ExtensionContext context) throws Exception {
-        executorService = Executors.newWorkStealingPool();
+        ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+        executorService = Executors.newCachedThreadPool(threadFactory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
----------------------------------------------------------------------
diff --git a/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java b/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
index 9712a42..7364d86 100644
--- a/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
+++ b/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
@@ -22,11 +22,12 @@ package org.apache.james.spamassassin.mock;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.junit.rules.ExternalResource;
 
 public class MockSpamdTestRule extends ExternalResource {
 
-    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private ExecutorService executor = Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass()));
     private MockSpamd spamd = new MockSpamd();
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[3/3] james-project git commit: JAMES-2550 Mailqueue with reactor

Posted by bt...@apache.org.
JAMES-2550 Mailqueue with reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f121dd8d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f121dd8d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f121dd8d

Branch: refs/heads/master
Commit: f121dd8d413321db9a85a5cce42b572da392c2b5
Parents: e96b5cb
Author: Benoit Tellier <bt...@linagora.com>
Authored: Tue Dec 4 16:10:33 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:04:29 2018 +0700

----------------------------------------------------------------------
 pom.xml                                         |  7 +++
 server/queue/queue-rabbitmq/pom.xml             |  9 +++
 .../apache/james/queue/rabbitmq/Dequeuer.java   |  2 +-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java |  4 +-
 .../queue/rabbitmq/view/api/MailQueueView.java  |  2 +-
 .../rabbitmq/view/cassandra/BrowseStartDAO.java | 26 ++++----
 .../cassandra/CassandraMailQueueBrowser.java    | 63 +++++++++-----------
 .../cassandra/CassandraMailQueueMailDelete.java | 31 +++++-----
 .../cassandra/CassandraMailQueueMailStore.java  |  7 ++-
 .../view/cassandra/CassandraMailQueueView.java  | 42 ++++++-------
 .../view/cassandra/DeletedMailsDAO.java         | 20 +++----
 .../view/cassandra/EnqueuedMailsDAO.java        | 25 ++++----
 .../view/cassandra/model/BucketedSlices.java    | 22 +++----
 .../view/cassandra/BrowseStartDAOTest.java      | 27 +++++----
 .../CassandraMailQueueViewTestFactory.java      | 10 +++-
 .../view/cassandra/DeletedMailsDAOTest.java     | 22 +++----
 .../view/cassandra/EnqueuedMailsDaoTest.java    | 16 ++---
 .../cassandra/model/BucketedSlicesTest.java     | 12 ++--
 18 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 300a18b..c4d85bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -666,6 +666,13 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-bom</artifactId>
+                <version>Bismuth-RELEASE</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
                 <groupId>${james.groupId}</groupId>
                 <artifactId>apache-james-backends-cassandra</artifactId>
                 <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 14564ff..ab4ffff 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -171,6 +171,15 @@
             <version>${feign.version}</version>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 6caef63..76e9838 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -103,7 +103,7 @@ class Dequeuer {
                 if (success) {
                     dequeueMetric.increment();
                     rabbitClient.ack(deliveryTag);
-                    mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
+                    mailQueueView.delete(DeleteCondition.withName(mail.getName()));
                 } else {
                     rabbitClient.nack(deliveryTag);
                 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 0909469..1873313 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -93,12 +93,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
 
     @Override
     public long clear() {
-        return mailQueueView.delete(DeleteCondition.all()).join();
+        return mailQueueView.delete(DeleteCondition.all());
     }
 
     @Override
     public long remove(Type type, String value) {
-        return mailQueueView.delete(DeleteCondition.from(type, value)).join();
+        return mailQueueView.delete(DeleteCondition.from(type, value));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index a291d61..12ca723 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -36,7 +36,7 @@ public interface MailQueueView {
 
     CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);
 
-    CompletableFuture<Long> delete(DeleteCondition deleteCondition);
+    long delete(DeleteCondition deleteCondition);
 
     CompletableFuture<Boolean> isPresent(Mail mail);
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 4425c46..9552f5c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,8 +31,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
 
 import java.time.Instant;
 import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
 
 public class BrowseStartDAO {
 
@@ -79,28 +78,29 @@ public class BrowseStartDAO {
             .value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
     }
 
-    CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) {
+    Mono<Instant> findBrowseStart(MailQueueName queueName) {
         return selectOne(queueName)
-            .thenApply(optional -> optional.map(this::getBrowseStart));
+            .map(this::getBrowseStart);
     }
 
-    CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
-        return executor.executeVoid(updateOne.bind()
+    Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+        return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind()
             .setTimestamp(BROWSE_START, Date.from(sliceStart))
-            .setString(QUEUE_NAME, mailQueueName.asString()));
+            .setString(QUEUE_NAME, mailQueueName.asString())));
     }
 
-    CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
-        return executor.executeVoid(insertOne.bind()
+    Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+        return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
             .setTimestamp(BROWSE_START, Date.from(sliceStart))
-            .setString(QUEUE_NAME, mailQueueName.asString()));
+            .setString(QUEUE_NAME, mailQueueName.asString())));
     }
 
     @VisibleForTesting
-    CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
-        return executor.executeSingleRow(
+    Mono<Row> selectOne(MailQueueName queueName) {
+        return Mono.fromCompletionStage(executor.executeSingleRow(
             selectOne.bind()
-                .setString(QUEUE_NAME, queueName.asString()));
+                .setString(QUEUE_NAME, queueName.asString())))
+            .flatMap(Mono::justOrEmpty);
     }
 
     private Instant getBrowseStart(Row row) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index a5a9bb9..4279c15 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -21,16 +21,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
-import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;
 
 import java.time.Clock;
 import java.time.Instant;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
+import java.util.List;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -44,12 +40,14 @@ import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraMailQueueBrowser {
 
@@ -101,23 +99,24 @@ public class CassandraMailQueueBrowser {
         this.clock = clock;
     }
 
-    CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
+    Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
         return browseReferences(queueName)
-            .map(this::toMailFuture, FluentFutureStream::unboxFuture)
-            .map(ManageableMailQueue.MailQueueItemView::new)
-            .completableFuture();
+            .flatMapSequential(this::toMailFuture)
+            .map(ManageableMailQueue.MailQueueItemView::new);
     }
 
-    FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
-        return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
-            .thenApply(this::allSlicesStartingAt))
-            .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
+    Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
+        return browseStartDao.findBrowseStart(queueName)
+            .flatMapMany(this::allSlicesStartingAt)
+            .flatMapSequential(slice -> browseSlice(queueName, slice))
+            .flatMapSequential(Flux::fromIterable)
+            .subscribeOn(Schedulers.parallel());
     }
 
-    private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+    private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
         EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
-        return mimeMessageStore.read(enqueuedItem.getPartsId())
-            .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+        return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
+            .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
     }
 
     private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -132,31 +131,25 @@ public class CassandraMailQueueBrowser {
         return mail;
     }
 
-    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
-        return FluentFutureStream.of(
+    private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) {
+        return
             allBucketIds()
-                .map(bucketId ->
-                    browseBucket(queueName, slice, bucketId).completableFuture()),
-            FluentFutureStream::unboxStream)
-            .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+                .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
+                .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
-    private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
-        return FluentFutureStream.of(
-            enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
-                .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
+    private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
+        return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
+            .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
     }
 
-    private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {
-        return maybeBrowseStart
-            .map(Slice::of)
-            .map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow()))
-            .orElse(Stream.empty());
+    private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
+        return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow()));
     }
 
-    private Stream<BucketId> allBucketIds() {
-        return IntStream
+    private Flux<BucketId> allBucketIds() {
+        return Flux
             .range(0, configuration.getBucketCount())
-            .mapToObj(BucketId::of);
+            .map(BucketId::of);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index b94b1ed..57732cb 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -20,10 +20,7 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Instant;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -32,6 +29,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueMailDelete {
 
     private final DeletedMailsDAO deletedMailsDao;
@@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete {
         this.random = random;
     }
 
-    CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
+    Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
         return considerDeleted(MailKey.fromMail(mail), mailQueueName);
     }
 
-    CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
+    Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
         return deletedMailsDao
             .markAsDeleted(mailQueueName, mailKey)
-            .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
+            .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
     }
 
-    CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
+    Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
         return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail));
     }
 
-    CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) {
-        return findNewBrowseStart(mailQueueName)
-            .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart));
+    void updateBrowseStart(MailQueueName mailQueueName) {
+        Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName);
+        updateNewBrowseStart(mailQueueName, newBrowseStart);
     }
 
     private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
         if (shouldUpdateBrowseStart()) {
-            updateBrowseStart(mailQueueName).join();
+            updateBrowseStart(mailQueueName);
         }
     }
 
-    private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
+    private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
-            .completableFuture()
-            .thenApply(Stream::findFirst);
+            .next();
     }
 
-    private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) {
+    private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) {
         return maybeNewBrowseStart
-            .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
-            .orElse(CompletableFuture.completedFuture(null));
+            .flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant));
     }
 
     private boolean shouldUpdateBrowseStart() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index 41c28a9..1dd07f5 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Clock;
 import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
 
 import javax.inject.Inject;
 
@@ -32,6 +31,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Bucke
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueMailStore {
 
     private final EnqueuedMailsDAO enqueuedMailsDao;
@@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore {
         this.clock = clock;
     }
 
-    CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+    Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
         EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);
 
         return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
     }
 
-    CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
+    Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) {
         return browseStartDao
             .insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 4980f46..a0ca3cd 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -20,7 +20,6 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -33,9 +32,10 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
-import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueView implements MailQueueView {
 
     public static class Factory implements MailQueueView.Factory {
@@ -80,62 +80,54 @@ public class CassandraMailQueueView implements MailQueueView {
 
     @Override
     public void initialize(MailQueueName mailQueueName) {
-        storeHelper.initializeBrowseStart(mailQueueName)
-            .join();
+        storeHelper.initializeBrowseStart(mailQueueName).block();
     }
 
     @Override
     public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
-        return storeHelper.storeMail(enqueuedItem);
+        return storeHelper.storeMail(enqueuedItem).toFuture();
     }
 
     @Override
     public ManageableMailQueue.MailQueueIterator browse() {
         return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
             cassandraMailQueueBrowser.browse(mailQueueName)
-                .join()
+                .toIterable()
                 .iterator());
     }
 
     @Override
     public long getSize() {
-        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
-                .join()
-                .count();
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block();
     }
 
     @Override
-    public CompletableFuture<Long> delete(DeleteCondition deleteCondition) {
+    public long delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithName) {
             DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition;
-
-            return delete(MailKey.of(nameDeleteCondition.getName())).thenApply(any -> 1L);
+            return delete(MailKey.of(nameDeleteCondition.getName())).map(any -> 1L).block();
         }
-
         return browseThenDelete(deleteCondition);
     }
 
-    private CompletableFuture<Long> browseThenDelete(DeleteCondition deleteCondition) {
-        CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName)
+    private long browseThenDelete(DeleteCondition deleteCondition) {
+        return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail()))
-            .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName),
-                FluentFutureStream::unboxFuture)
-            .completableFuture()
-            .thenApply(Stream::count);
-
-        result.thenRunAsync(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
-
-        return result;
+            .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName))
+            .count()
+            .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+            .block();
     }
 
-    private CompletableFuture<Void> delete(MailKey mailKey) {
+    private Mono<Void> delete(MailKey mailKey) {
         return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName);
     }
 
     @Override
     public CompletableFuture<Boolean> isPresent(Mail mail) {
         return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName)
-                .thenApply(bool -> !bool);
+                .map(bool -> !bool)
+                .toFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
index c5feefc..a1986e1 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
@@ -27,8 +27,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME;
 import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME;
 
-import java.util.concurrent.CompletableFuture;
-
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,6 +35,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
 
 public class DeletedMailsDAO {
 
@@ -64,20 +63,21 @@ public class DeletedMailsDAO {
             .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
     }
 
-    CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
-        return executor.executeVoid(insertOne.bind()
+    Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+        return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
             .setString(QUEUE_NAME, mailQueueName.asString())
-            .setString(MAIL_KEY, mailKey.getMailKey()));
+            .setString(MAIL_KEY, mailKey.getMailKey())));
     }
 
-    CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
-        return executor.executeReturnExists(
+    Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+        return Mono.fromCompletionStage(executor.executeReturnExists(
             selectOne.bind()
                 .setString(QUEUE_NAME, mailQueueName.asString())
-                .setString(MAIL_KEY, mailKey.getMailKey()));
+                .setString(MAIL_KEY, mailKey.getMailKey())));
     }
 
-    CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
-        return isDeleted(mailQueueName, mailKey).thenApply(b -> !b);
+    Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
+        return isDeleted(mailQueueName, mailKey)
+            .map(b -> !b);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index fef9ba1..eb1e9cf 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -47,8 +47,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
 import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
 
 import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -64,6 +62,10 @@ import org.apache.mailet.Mail;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 public class EnqueuedMailsDAO {
 
@@ -73,10 +75,12 @@ public class EnqueuedMailsDAO {
     private final CassandraUtils cassandraUtils;
     private final CassandraTypesProvider cassandraTypesProvider;
     private final BlobId.Factory blobFactory;
+    private Scheduler scheduler;
 
     @Inject
     EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider,
                      BlobId.Factory blobIdFactory) {
+        this.scheduler = Schedulers.parallel();
         this.executor = new CassandraAsyncExecutor(session);
         this.cassandraUtils = cassandraUtils;
         this.cassandraTypesProvider = cassandraTypesProvider;
@@ -114,13 +118,13 @@ public class EnqueuedMailsDAO {
             .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
     }
 
-    CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
+    Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
         EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem();
         EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext();
         Mail mail = enqueuedItem.getMail();
         MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId();
 
-        return executor.executeVoid(insert.bind()
+        return Mono.fromCompletionStage(executor.executeVoid(insert.bind()
             .setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString())
             .setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart()))
             .setInt(BUCKET_ID, slicingContext.getBucketId().getValue())
@@ -136,19 +140,20 @@ public class EnqueuedMailsDAO {
             .setString(REMOTE_HOST, mail.getRemoteHost())
             .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
             .setMap(ATTRIBUTES, toRawAttributeMap(mail))
-            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())));
+            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))));
     }
 
-    CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails(
+    Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(
         MailQueueName queueName, Slice slice, BucketId bucketId) {
 
-        return executor.execute(
+        return Mono.fromCompletionStage(executor.execute(
             selectFrom.bind()
                 .setString(QUEUE_NAME, queueName.asString())
                 .setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
-                .setInt(BUCKET_ID, bucketId.getValue()))
-            .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
-                .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)));
+                .setInt(BUCKET_ID, bucketId.getValue())))
+            .map(cassandraUtils::convertToStream)
+            .flatMapMany(Flux::fromStream)
+            .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
index d35b6d7..2f60736 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -69,17 +69,6 @@ public class BucketedSlices {
             return new Slice(sliceStartInstant);
         }
 
-        public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt, Duration windowSize) {
-            long sliceCount = calculateSliceCount(firstSlice, endAt, windowSize);
-            long startAtSeconds =  firstSlice.getStartSliceInstant().getEpochSecond();
-            long sliceWindowSizeInSecond = windowSize.getSeconds();
-
-            return LongStream.range(0, sliceCount)
-                .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
-                .mapToObj(Instant::ofEpochSecond)
-                .map(Slice::of);
-        }
-
         private static long calculateSliceCount(Slice firstSlice, Instant endAt, Duration windowSize) {
             long startAtSeconds =  firstSlice.getStartSliceInstant().getEpochSecond();
             long endAtSeconds = endAt.getEpochSecond();
@@ -104,6 +93,17 @@ public class BucketedSlices {
             return startSliceInstant;
         }
 
+        public Stream<Slice> allSlicesTill(Instant endAt, Duration windowSize) {
+            long sliceCount = calculateSliceCount(this, endAt, windowSize);
+            long startAtSeconds = this.getStartSliceInstant().getEpochSecond();
+            long sliceWindowSizeInSecond = windowSize.getSeconds();
+
+            return LongStream.range(0, sliceCount)
+                .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
+                .mapToObj(Instant::ofEpochSecond)
+                .map(Slice::of);
+        }
+
 
         @Override
         public final boolean equals(Object o) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index ef844c6..c427d44 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Instant;
-import java.util.Optional;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -31,6 +30,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+
 class BrowseStartDAOTest {
 
     private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
@@ -50,37 +51,37 @@ class BrowseStartDAOTest {
 
     @Test
     void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
 
-        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
-        assertThat(firstEnqueuedItemFromQueue2)
+        Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+        assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
             .isEmpty();
     }
 
     @Test
     void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
-        testee.updateBrowseStart(OUT_GOING_2, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
+        testee.updateBrowseStart(OUT_GOING_2, NOW).block();
 
-        Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
-        assertThat(firstEnqueuedItemFromQueue2)
+        Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+        assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
             .isNotEmpty();
     }
 
     @Test
     void updateFirstEnqueuedTimeShouldWork() {
-        testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+        testee.updateBrowseStart(OUT_GOING_1, NOW).block();
 
-        assertThat(testee.selectOne(OUT_GOING_1).join())
+        assertThat(testee.selectOne(OUT_GOING_1).flux().collectList().block())
             .isNotEmpty();
     }
 
     @Test
     void insertInitialBrowseStartShouldInsertFirstInstant() {
-        testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join();
-        testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join();
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW).block();
+        testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).block();
 
-        assertThat(testee.findBrowseStart(OUT_GOING_1).join())
+        assertThat(testee.findBrowseStart(OUT_GOING_1).flux().collectList().block())
             .contains(NOW);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 7eadf9c..d9a451a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,11 +25,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -38,6 +38,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraMailQueueViewTestFactory {
 
     public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
@@ -69,7 +71,9 @@ public class CassandraMailQueueViewTestFactory {
     public static boolean isInitialized(Session session, MailQueueName mailQueueName) {
         BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
         return browseStartDao.findBrowseStart(mailQueueName)
-            .thenApply(Optional::isPresent)
-            .join();
+            .map(Optional::ofNullable)
+            .switchIfEmpty(Mono.just(Optional.empty()))
+            .block()
+            .isPresent();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
index d9dc69a..e21804f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
@@ -50,58 +50,58 @@ class DeletedMailsDAOTest {
     void markAsDeletedShouldWork() {
         Boolean isDeletedBeforeMark = testee
                 .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-                .join();
+                .block();
         assertThat(isDeletedBeforeMark).isFalse();
 
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
 
         Boolean isDeletedAfterMark = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeletedAfterMark).isTrue();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() {
-        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join();
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() {
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() {
-        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isFalse();
     }
 
     @Test
     void checkDeletedShouldReturnTrueWhenTableContainsMailItem() {
-        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+        testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
 
         Boolean isDeleted = testee
             .isDeleted(OUT_GOING_1, MAIL_KEY_1)
-            .join();
+            .block();
 
         assertThat(isDeleted).isTrue();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
index 3d44db1..13b9e00 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.SoftAssertions.assertSoftly;
 
 import java.time.Instant;
-import java.util.stream.Stream;
+import java.util.List;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -86,11 +86,11 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
-            .join();
+            .block();
 
-        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
+        List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
             .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
-            .join();
+            .collectList().block();
 
         assertThat(selectedEnqueuedMails).hasSize(1);
     }
@@ -108,7 +108,7 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
                 .build())
-            .join();
+            .block();
 
         testee.insert(EnqueuedItemWithSlicingContext.builder()
                 .enqueuedItem(EnqueuedItem.builder()
@@ -121,10 +121,10 @@ class EnqueuedMailsDaoTest {
                     .build())
                 .slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW))
                 .build())
-            .join();
+            .block();
 
-        Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
-            .join();
+        List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+            .collectList().block();
 
         assertThat(selectedEnqueuedMails)
             .hasSize(1)

http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
index 842216c..1fb6916 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -64,13 +64,13 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
-        assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
+        assertThat(FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
             .containsOnly(FIRST_SLICE);
     }
 
     @Test
     void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
-        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlices = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
 
         assertThat(allSlices)
             .containsExactly(
@@ -81,9 +81,9 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() {
-        Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
-        Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
-        Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheStartOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheMiddleOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlicesEndAtTheEndWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
 
         Slice [] allSlicesInThreeHours = {
             FIRST_SLICE,
@@ -102,7 +102,7 @@ class BucketedSlicesTest {
 
     @Test
     void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() {
-        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
+        Stream<Slice> allSlices = FIRST_SLICE_NEXT_TWO_HOUR.allSlicesTill(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
 
         assertThat(allSlices).isEmpty();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/3] james-project git commit: JAMES-2619 remove code in NamedThreadFactory to rely on Guava

Posted by bt...@apache.org.
JAMES-2619 remove code in NamedThreadFactory to rely on Guava


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b0ebfb66
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b0ebfb66
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b0ebfb66

Branch: refs/heads/master
Commit: b0ebfb66209524cc2406ea508443a1a720da5a4a
Parents: 16e4d9f
Author: Matthieu Baechler <ma...@apache.org>
Authored: Wed Dec 5 10:49:34 2018 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:03:57 2018 +0700

----------------------------------------------------------------------
 .../JMXEnabledScheduledThreadPoolExecutor.java  |  2 +-
 .../JMXEnabledThreadPoolExecutor.java           |  4 +--
 .../util/concurrent/NamedThreadFactory.java     | 33 +++++++++-----------
 ...ledOrderedMemoryAwareThreadPoolExecutor.java |  4 +--
 4 files changed, 19 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
index 9c3f499..a51e152 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
@@ -50,7 +50,7 @@ public class JMXEnabledScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
     }
 
     public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, String jmxPath, String name) {
-        super(corePoolSize, new NamedThreadFactory(name));
+        super(corePoolSize, NamedThreadFactory.withName(name));
 
         this.jmxPath = jmxPath;
         registerMBean();

http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
index 2a42464..c80112a 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -153,7 +153,7 @@ public class JMXEnabledThreadPoolExecutor extends ThreadPoolExecutor implements
      * 
      */
     public static JMXEnabledThreadPoolExecutor newCachedThreadPool(String jmxPath, String name) {
-        return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory(name), jmxPath);
+        return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), NamedThreadFactory.withName(name), jmxPath);
 
     }
 
@@ -174,6 +174,6 @@ public class JMXEnabledThreadPoolExecutor extends ThreadPoolExecutor implements
     }
     
     public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String jmxPath, String name, int nThreads) {
-        return newFixedThreadPool(jmxPath, nThreads, new NamedThreadFactory(name));
+        return newFixedThreadPool(jmxPath, nThreads, NamedThreadFactory.withName(name));
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
index 8aeaaf5..f982349 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
@@ -19,35 +19,30 @@
 package org.apache.james.util.concurrent;
 
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
 
-/**
- * ThreadPool which use name and a counter for thread names
- */
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 public class NamedThreadFactory implements ThreadFactory {
 
     public final String name;
-    private final AtomicLong count = new AtomicLong();
-    private final int priority;
+    private final ThreadFactory threadFactory;
 
-    public NamedThreadFactory(String name, int priority) {
-        if (priority > Thread.MAX_PRIORITY || priority < Thread.MIN_PRIORITY) {
-            throw new IllegalArgumentException("Priority must be <= " + Thread.MAX_PRIORITY + " and >=" + Thread.MIN_PRIORITY);
-        }
-        this.name = name;
-        this.priority = priority;
+    public static NamedThreadFactory withClassName(Class<?> clazz) {
+        return new NamedThreadFactory(clazz.getName());
     }
 
-    public NamedThreadFactory(String name) {
-        this(name, Thread.NORM_PRIORITY);
+    public static NamedThreadFactory withName(String name) {
+        return new NamedThreadFactory(name);
+    }
+
+    private NamedThreadFactory(String name) {
+        this.name = name;
+        this.threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-%d").build();
     }
 
     @Override
     public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName(name + "-" + count.incrementAndGet());
-        t.setPriority(priority);
-        return t;
+        return threadFactory.newThread(r);
     }
 
     public String getName() {
@@ -56,7 +51,7 @@ public class NamedThreadFactory implements ThreadFactory {
 
     @Override
     public String toString() {
-        return "NamedTreadFactory: " + getName();
+        return "NamedThreadFactory: " + getName();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
index e59a8fb..f0d64df 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
@@ -44,13 +44,13 @@ public class JMXEnabledOrderedMemoryAwareThreadPoolExecutor extends OrderedMemor
     private String mbeanName;
     
     public JMXEnabledOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, String jmxPath, String name) {
-        super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS, new NamedThreadFactory(name));
+        super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS, NamedThreadFactory.withName(name));
         this.jmxPath = jmxPath;
         registerMBean();
     }
     
     public JMXEnabledOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, String jmxPath, String name) {
-        super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new NamedThreadFactory(name));
+        super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, NamedThreadFactory.withName(name));
         this.jmxPath = jmxPath;
         registerMBean();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org