You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/06 08:05:38 UTC
[1/3] james-project git commit: JAMES-2619 Make sure to name all
Executors and Threads
Repository: james-project
Updated Branches:
refs/heads/master 16e4d9f93 -> f121dd8d4
JAMES-2619 Make sure to name all Executors and Threads
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e96b5cbb
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e96b5cbb
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e96b5cbb
Branch: refs/heads/master
Commit: e96b5cbb38ebdbe03b1733c8cb835609591cca1e
Parents: b0ebfb6
Author: Matthieu Baechler <ma...@apache.org>
Authored: Tue Dec 4 18:21:36 2018 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:03:57 2018 +0700
----------------------------------------------------------------------
.../cassandra/migration/CassandraMigrationServiceTest.java | 4 +++-
.../apache/james/backends/es/ElasticSearchIndexerTest.java | 6 +++++-
.../backend/rabbitmq/RabbitMQConnectionFactoryTest.java | 4 +++-
.../org/apache/james/backend/rabbitmq/RabbitMQExtension.java | 5 ++++-
.../org/apache/james/mailbox/MailboxManagerStressTest.java | 5 ++++-
.../james/mailbox/cassandra/mail/CassandraACLMapperTest.java | 5 ++++-
.../mailbox/elasticsearch/ElasticSearchIntegrationTest.java | 6 ++++--
.../ElasticSearchQuotaSearchTestSystemExtension.java | 5 ++++-
.../events/ElasticSearchQuotaMailboxListenerTest.java | 5 ++++-
.../james/mailbox/store/event/AsynchronousEventDelivery.java | 5 ++++-
.../elasticsearch/host/ElasticSearchHostSystem.java | 5 ++++-
.../java/org/apache/james/imap/processor/IdleProcessor.java | 5 ++++-
.../james/imap/processor/base/SelectedMailboxImplTest.java | 5 ++++-
.../apache/james/protocols/netty/AbstractAsyncServer.java | 8 ++++++--
.../modules/mailbox/ScheduledExecutorServiceProvider.java | 6 +++++-
.../org/apache/james/JamesServerWithRetryConnectionTest.java | 5 ++++-
.../james/modules/server/AsyncTasksExecutorModule.java | 6 +++++-
.../apache/james/util/concurrency/ConcurrentTestRunner.java | 5 ++++-
.../org/apache/james/util/CompletableFutureUtilTest.java | 5 ++++-
.../org/apache/james/util/retry/RetryExecutorUtilTest.java | 5 ++++-
.../org/apache/james/transport/mailets/RemoteDelivery.java | 5 ++++-
.../webadmin/routes/ElasticSearchQuotaSearchExtension.java | 5 ++++-
.../main/java/org/apache/james/queue/file/FileMailQueue.java | 3 ++-
.../main/java/org/apache/james/queue/rabbitmq/Dequeuer.java | 5 ++++-
.../main/java/org/apache/james/task/MemoryTaskManager.java | 5 ++++-
.../main/java/org/apache/james/junit/ExecutorExtension.java | 5 ++++-
.../apache/james/spamassassin/mock/MockSpamdTestRule.java | 3 ++-
27 files changed, 107 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
index 7730e80..7631f4b 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/migration/CassandraMigrationServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.task.Task;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -74,7 +75,8 @@ public class CassandraMigrationServiceTest {
.put(LATEST_VERSION, successfulMigration)
.build();
testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
- executorService = Executors.newFixedThreadPool(2);
+ executorService = Executors.newFixedThreadPool(2,
+ NamedThreadFactory.withClassName(getClass()));
}
@After
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
----------------------------------------------------------------------
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index fa51c8f..0654a78 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -24,8 +24,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.backends.es.utils.TestingClientProvider;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilders;
@@ -61,7 +63,9 @@ public class ElasticSearchIndexerTest {
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(clientProvider.get());
- testee = new ElasticSearchIndexer(clientProvider.get(), Executors.newSingleThreadExecutor(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+ testee = new ElasticSearchIndexer(clientProvider.get(),
+ Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
+ ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
}
@Test
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
index 489f557f..87d9904 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQConnectionFactoryTest.java
@@ -24,7 +24,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -36,7 +38,7 @@ class RabbitMQConnectionFactoryTest {
@BeforeEach
void setUp() throws Exception {
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
}
@Test
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index acf1a2c..d6d2ead 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -22,7 +22,9 @@ import static org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEME
import java.net.URISyntaxException;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -91,8 +93,9 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback,
.minDelay(ONE_HUNDRED_MILLISECONDS)
.build();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
return new RabbitMQConnectionFactory(
rabbitMQConfiguration,
- new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+ new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
index 9f13dec..21dbb71 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +37,7 @@ import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mime4j.dom.Message;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
@@ -55,9 +57,10 @@ public abstract class MailboxManagerStressTest {
@Test
public void testStressTest() throws InterruptedException, MailboxException {
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
final CountDownLatch latch = new CountDownLatch(APPEND_OPERATIONS);
- final ExecutorService pool = Executors.newFixedThreadPool(APPEND_OPERATIONS / 20);
+ final ExecutorService pool = Executors.newFixedThreadPool(APPEND_OPERATIONS / 20, threadFactory);
final Collection<MessageUid> uList = new ConcurrentLinkedDeque<>();
final String username = "username";
MailboxSession session = mailboxManager.createSystemSession(username);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
index cdea6fa..d9f7c89 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -40,6 +41,7 @@ import org.apache.james.mailbox.cassandra.modules.CassandraAclModule;
import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxACL;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,7 +61,8 @@ class CassandraACLMapperTest {
void setUp(CassandraCluster cassandra) {
cassandraACLMapper = GuiceUtils.testInjector(cassandra)
.getInstance(CassandraACLMapper.class);
- executor = Executors.newFixedThreadPool(2);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executor = Executors.newFixedThreadPool(2, threadFactory);
}
@AfterEach
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 666177d..4dc236e 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -50,6 +51,7 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
import org.apache.james.mailbox.tika.TikaTextExtractor;
import org.apache.james.metrics.api.NoopMetricFactory;
import org.apache.james.mime4j.dom.Message;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.client.Client;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -99,11 +101,11 @@ public class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest
storeMailboxManager = new InMemoryIntegrationResources()
.createMailboxManager(new SimpleGroupMembershipResolver());
-
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
ElasticSearchListeningMessageSearchIndex elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex(
storeMailboxManager.getMapperFactory(),
new ElasticSearchIndexer(client,
- Executors.newSingleThreadExecutor(),
+ Executors.newSingleThreadExecutor(threadFactory),
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
MailboxElasticSearchConstants.MESSAGE_TYPE,
BATCH_SIZE),
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
index 2d511cf..872cf33 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
@@ -22,6 +22,7 @@ package org.apache.james.quota.search.elasticsearch;
import static org.mockito.Mockito.mock;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -36,6 +37,7 @@ import org.apache.james.quota.search.QuotaSearchTestSystem;
import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.client.Client;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -69,8 +71,9 @@ public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterRes
MemoryDomainList domainList = new MemoryDomainList(dnsService);
usersRepository.setDomainList(domainList);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
ElasticSearchQuotaMailboxListener listener = new ElasticSearchQuotaMailboxListener(
- new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(),
+ new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(threadFactory),
QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE),
new QuotaRatioToElasticSearchJson());
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
index 4ea8e30..dabec13 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
@@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -39,6 +40,7 @@ import org.apache.james.mailbox.quota.QuotaFixture.Sizes;
import org.apache.james.quota.search.elasticsearch.QuotaRatioElasticSearchConstants;
import org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.junit.Before;
@@ -66,9 +68,10 @@ public class ElasticSearchQuotaMailboxListenerTest {
client = QuotaSearchIndexCreationUtil.prepareDefaultClient(
new TestingClientProvider(embeddedElasticSearch.getNode()).get(), ElasticSearchConfiguration.DEFAULT_CONFIGURATION);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
quotaMailboxListener = new ElasticSearchQuotaMailboxListener(
new ElasticSearchIndexer(client,
- Executors.newSingleThreadExecutor(),
+ Executors.newSingleThreadExecutor(threadFactory),
QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE,
BATCH_SIZE),
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
index 9ca9824..593de54 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/AsynchronousEventDelivery.java
@@ -21,11 +21,13 @@ package org.apache.james.mailbox.store.event;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.annotation.PreDestroy;
import org.apache.james.mailbox.Event;
import org.apache.james.mailbox.MailboxListener;
+import org.apache.james.util.concurrent.NamedThreadFactory;
public class AsynchronousEventDelivery implements EventDelivery {
@@ -33,7 +35,8 @@ public class AsynchronousEventDelivery implements EventDelivery {
private final SynchronousEventDelivery synchronousEventDelivery;
public AsynchronousEventDelivery(int threadPoolSize, SynchronousEventDelivery synchronousEventDelivery) {
- this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ this.threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
this.synchronousEventDelivery = synchronousEventDelivery;
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
----------------------------------------------------------------------
diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
index 2404f25..4186ee9 100644
--- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
+++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
@@ -23,6 +23,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZoneId;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.NotImplementedException;
@@ -61,6 +62,7 @@ import org.apache.james.metrics.logger.DefaultMetricFactory;
import org.apache.james.mpt.api.ImapFeatures;
import org.apache.james.mpt.api.ImapFeatures.Feature;
import org.apache.james.mpt.host.JamesImapHostSystem;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.client.Client;
public class ElasticSearchHostSystem extends JamesImapHostSystem {
@@ -96,10 +98,11 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem {
InMemoryMailboxSessionMapperFactory factory = new InMemoryMailboxSessionMapperFactory();
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
ElasticSearchListeningMessageSearchIndex searchIndex = new ElasticSearchListeningMessageSearchIndex(
factory,
new ElasticSearchIndexer(client,
- Executors.newSingleThreadExecutor(),
+ Executors.newSingleThreadExecutor(threadFactory),
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
MailboxElasticSearchConstants.MESSAGE_TYPE),
new ElasticSearchSearcher(client,
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
----------------------------------------------------------------------
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index 0018458..7700d28 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +50,7 @@ import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,7 +80,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
this.heartbeatIntervalUnit = imapConfiguration.getIdleTimeIntervalUnit();
this.enableIdle = imapConfiguration.isEnableIdle();
if (enableIdle) {
- this.heartbeatExecutor = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_POOL_CORE_SIZE);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ this.heartbeatExecutor = Executors.newScheduledThreadPool(DEFAULT_SCHEDULED_POOL_CORE_SIZE, threadFactory);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
----------------------------------------------------------------------
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index e923b2a..ef8d5b9 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.mail.Flags;
@@ -51,6 +52,7 @@ import org.apache.james.mailbox.store.SimpleMessageMetaData;
import org.apache.james.mailbox.store.event.EventFactory;
import org.apache.james.mailbox.store.mail.model.DefaultMessageId;
import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -79,7 +81,8 @@ public class SelectedMailboxImplTest {
@Before
public void setUp() throws Exception {
- executorService = Executors.newFixedThreadPool(1);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executorService = Executors.newFixedThreadPool(1, threadFactory);
mailboxPath = MailboxPath.forUser("tellier@linagora.com", MailboxConstants.INBOX);
mailboxManager = mock(MailboxManager.class);
messageManager = mock(MessageManager.class);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
----------------------------------------------------------------------
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
index faddde6..ef8554d 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.protocols.api.ProtocolServer;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -203,7 +205,8 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
* @return bossExecutor
*/
protected Executor createBossExecutor() {
- return Executors.newCachedThreadPool();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ return Executors.newCachedThreadPool(threadFactory);
}
/**
@@ -212,7 +215,8 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
* @return workerExecutor
*/
protected Executor createWorkerExecutor() {
- return Executors.newCachedThreadPool();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ return Executors.newCachedThreadPool(threadFactory);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
index 48fe3f1..56ec923 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ScheduledExecutorServiceProvider.java
@@ -21,9 +21,12 @@ package org.apache.james.modules.mailbox;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import javax.annotation.PreDestroy;
+import org.apache.james.util.concurrent.NamedThreadFactory;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Provider;
@@ -34,7 +37,8 @@ class ScheduledExecutorServiceProvider implements Provider<ScheduledExecutorServ
@VisibleForTesting
ScheduledExecutorServiceProvider() {
- scheduler = Executors.newSingleThreadScheduledExecutor();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
----------------------------------------------------------------------
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
index 5b5b33f..69954af 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesServerWithRetryConnectionTest.java
@@ -29,6 +29,7 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.james.backends.es.ElasticSearchConfiguration;
@@ -37,6 +38,7 @@ import org.apache.james.mailbox.store.search.PDFTextExtractor;
import org.apache.james.modules.TestJMAPServerModule;
import org.apache.james.modules.protocols.ImapGuiceProbe;
import org.apache.james.util.Host;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.james.util.docker.Images;
import org.apache.james.util.docker.SwarmGenericContainer;
import org.junit.jupiter.api.AfterEach;
@@ -104,7 +106,8 @@ class JamesServerWithRetryConnectionTest {
@BeforeEach
void setUp() throws IOException {
- executorService = Executors.newFixedThreadPool(1);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executorService = Executors.newFixedThreadPool(1, threadFactory);
socketChannel = SocketChannel.open();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
----------------------------------------------------------------------
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
index b8e42fe..fc30b12 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java
@@ -20,9 +20,12 @@ package org.apache.james.modules.server;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.annotation.PreDestroy;
+import org.apache.james.util.concurrent.NamedThreadFactory;
+
import com.google.inject.AbstractModule;
import com.google.inject.Provider;
import com.google.inject.name.Names;
@@ -33,9 +36,10 @@ public class AsyncTasksExecutorModule extends AbstractModule {
@Override
protected void configure() {
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
bind(ExecutorService.class).annotatedWith(Names.named("AsyncExecutor"))
.toProvider(new LifecycleAwareExecutorServiceProvider(
- Executors.newFixedThreadPool(THREAD_POOL_SIZE)));
+ Executors.newFixedThreadPool(THREAD_POOL_SIZE, threadFactory)));
}
public static class LifecycleAwareExecutorServiceProvider implements Provider<ExecutorService> {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index 8411d40..eec20c0 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -28,8 +28,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +143,8 @@ public class ConcurrentTestRunner {
this.operationCount = operationCount;
this.countDownLatch = new CountDownLatch(threadCount);
this.biConsumer = biConsumer;
- this.executorService = Executors.newFixedThreadPool(threadCount);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ this.executorService = Executors.newFixedThreadPool(threadCount, threadFactory);
this.futures = new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
index bf136ca..cdfce5e 100644
--- a/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/CompletableFutureUtilTest.java
@@ -27,11 +27,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,7 +46,8 @@ public class CompletableFutureUtilTest {
@Before
public void setUp() {
- executorService = Executors.newFixedThreadPool(4);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executorService = Executors.newFixedThreadPool(4, threadFactory);
}
@After
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
index 81c2f11..28307fb 100644
--- a/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
+++ b/server/container/util/src/test/java/org/apache/james/util/retry/RetryExecutorUtilTest.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +51,8 @@ public class RetryExecutorUtilTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}
@After
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
----------------------------------------------------------------------
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
index 48bcf65..4e5d68f 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
@@ -42,6 +43,7 @@ import org.apache.james.transport.mailets.remote.delivery.Bouncer;
import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
import org.apache.james.transport.mailets.remote.delivery.RemoteDeliverySocketFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
@@ -171,7 +173,8 @@ public class RemoteDelivery extends GenericMailet {
}
private void initDeliveryThreads() {
- executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount());
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount(), threadFactory);
for (int a = 0; a < configuration.getWorkersThreadCount(); a++) {
executor.execute(
new DeliveryRunnable(queue,
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
----------------------------------------------------------------------
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
index 2eddc4c..2e9c409 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
@@ -22,6 +22,7 @@ package org.apache.james.webadmin.routes;
import static org.mockito.Mockito.mock;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -39,6 +40,7 @@ import org.apache.james.quota.search.elasticsearch.QuotaSearchIndexCreationUtil;
import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.client.Client;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -72,8 +74,9 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef
MemoryDomainList domainList = new MemoryDomainList(dnsService);
usersRepository.setDomainList(domainList);
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
ElasticSearchQuotaMailboxListener listener = new ElasticSearchQuotaMailboxListener(
- new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(),
+ new ElasticSearchIndexer(client, Executors.newSingleThreadExecutor(threadFactory),
QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
QuotaRatioElasticSearchConstants.QUOTA_RATIO_TYPE),
new QuotaRatioToElasticSearchJson());
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
index cc82148..a4d851a 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
@@ -53,6 +53,7 @@ import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.server.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.server.core.MimeMessageSource;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ public class FileMailQueue implements ManageableMailQueue {
private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>());
private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>();
- private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
private static final AtomicLong COUNTER = new AtomicLong();
private final String queueDirName;
private final File queueDir;
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 4354a50..6caef63 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -24,6 +24,7 @@ import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -32,6 +33,7 @@ import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.Throwing;
@@ -125,7 +127,8 @@ class Dequeuer {
}
private CompletableFuture<GetResponse> pollChannel() {
- return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory))
.withFixedRate()
.withMinDelay(TEN_MS)
.retryOn(NoMailYetException.class)
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
----------------------------------------------------------------------
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index e0cbcc2..e26e173 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +51,8 @@ public class MemoryTaskManager implements TaskManager {
public MemoryTaskManager() {
idToExecutionDetails = new ConcurrentHashMap<>();
idToFuture = new ConcurrentHashMap<>();
- executor = Executors.newSingleThreadExecutor();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executor = Executors.newSingleThreadExecutor(threadFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
----------------------------------------------------------------------
diff --git a/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java b/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
index c9b9545..53b87f3 100644
--- a/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
+++ b/server/testing/src/main/java/org/apache/james/junit/ExecutorExtension.java
@@ -21,7 +21,9 @@ package org.apache.james.junit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -35,7 +37,8 @@ public class ExecutorExtension implements ParameterResolver, BeforeEachCallback,
@Override
public void beforeEach(ExtensionContext context) throws Exception {
- executorService = Executors.newWorkStealingPool();
+ ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
+ executorService = Executors.newCachedThreadPool(threadFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e96b5cbb/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
----------------------------------------------------------------------
diff --git a/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java b/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
index 9712a42..7364d86 100644
--- a/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
+++ b/third-party/spamassassin/src/test/java/org/apache/james/spamassassin/mock/MockSpamdTestRule.java
@@ -22,11 +22,12 @@ package org.apache.james.spamassassin.mock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.junit.rules.ExternalResource;
public class MockSpamdTestRule extends ExternalResource {
- private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private ExecutorService executor = Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass()));
private MockSpamd spamd = new MockSpamd();
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[3/3] james-project git commit: JAMES-2550 Mailqueue with reactor
Posted by bt...@apache.org.
JAMES-2550 Mailqueue with reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f121dd8d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f121dd8d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f121dd8d
Branch: refs/heads/master
Commit: f121dd8d413321db9a85a5cce42b572da392c2b5
Parents: e96b5cb
Author: Benoit Tellier <bt...@linagora.com>
Authored: Tue Dec 4 16:10:33 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:04:29 2018 +0700
----------------------------------------------------------------------
pom.xml | 7 +++
server/queue/queue-rabbitmq/pom.xml | 9 +++
.../apache/james/queue/rabbitmq/Dequeuer.java | 2 +-
.../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +-
.../queue/rabbitmq/view/api/MailQueueView.java | 2 +-
.../rabbitmq/view/cassandra/BrowseStartDAO.java | 26 ++++----
.../cassandra/CassandraMailQueueBrowser.java | 63 +++++++++-----------
.../cassandra/CassandraMailQueueMailDelete.java | 31 +++++-----
.../cassandra/CassandraMailQueueMailStore.java | 7 ++-
.../view/cassandra/CassandraMailQueueView.java | 42 ++++++-------
.../view/cassandra/DeletedMailsDAO.java | 20 +++----
.../view/cassandra/EnqueuedMailsDAO.java | 25 ++++----
.../view/cassandra/model/BucketedSlices.java | 22 +++----
.../view/cassandra/BrowseStartDAOTest.java | 27 +++++----
.../CassandraMailQueueViewTestFactory.java | 10 +++-
.../view/cassandra/DeletedMailsDAOTest.java | 22 +++----
.../view/cassandra/EnqueuedMailsDaoTest.java | 16 ++---
.../cassandra/model/BucketedSlicesTest.java | 12 ++--
18 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 300a18b..c4d85bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -666,6 +666,13 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-bom</artifactId>
+ <version>Bismuth-RELEASE</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-cassandra</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 14564ff..ab4ffff 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -171,6 +171,15 @@
<version>${feign.version}</version>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 6caef63..76e9838 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -103,7 +103,7 @@ class Dequeuer {
if (success) {
dequeueMetric.increment();
rabbitClient.ack(deliveryTag);
- mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
+ mailQueueView.delete(DeleteCondition.withName(mail.getName()));
} else {
rabbitClient.nack(deliveryTag);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 0909469..1873313 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -93,12 +93,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
@Override
public long clear() {
- return mailQueueView.delete(DeleteCondition.all()).join();
+ return mailQueueView.delete(DeleteCondition.all());
}
@Override
public long remove(Type type, String value) {
- return mailQueueView.delete(DeleteCondition.from(type, value)).join();
+ return mailQueueView.delete(DeleteCondition.from(type, value));
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index a291d61..12ca723 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -36,7 +36,7 @@ public interface MailQueueView {
CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);
- CompletableFuture<Long> delete(DeleteCondition deleteCondition);
+ long delete(DeleteCondition deleteCondition);
CompletableFuture<Boolean> isPresent(Mail mail);
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 4425c46..9552f5c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,8 +31,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
import java.time.Instant;
import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
public class BrowseStartDAO {
@@ -79,28 +78,29 @@ public class BrowseStartDAO {
.value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
}
- CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) {
+ Mono<Instant> findBrowseStart(MailQueueName queueName) {
return selectOne(queueName)
- .thenApply(optional -> optional.map(this::getBrowseStart));
+ .map(this::getBrowseStart);
}
- CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
- return executor.executeVoid(updateOne.bind()
+ Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+ return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
- .setString(QUEUE_NAME, mailQueueName.asString()));
+ .setString(QUEUE_NAME, mailQueueName.asString())));
}
- CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
- return executor.executeVoid(insertOne.bind()
+ Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+ return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
- .setString(QUEUE_NAME, mailQueueName.asString()));
+ .setString(QUEUE_NAME, mailQueueName.asString())));
}
@VisibleForTesting
- CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
- return executor.executeSingleRow(
+ Mono<Row> selectOne(MailQueueName queueName) {
+ return Mono.fromCompletionStage(executor.executeSingleRow(
selectOne.bind()
- .setString(QUEUE_NAME, queueName.asString()));
+ .setString(QUEUE_NAME, queueName.asString())))
+ .flatMap(Mono::justOrEmpty);
}
private Instant getBrowseStart(Row row) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index a5a9bb9..4279c15 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -21,16 +21,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
-import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;
import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
+import java.util.List;
import javax.inject.Inject;
import javax.mail.MessagingException;
@@ -44,12 +40,14 @@ import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
-import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class CassandraMailQueueBrowser {
@@ -101,23 +99,24 @@ public class CassandraMailQueueBrowser {
this.clock = clock;
}
- CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
+ Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
return browseReferences(queueName)
- .map(this::toMailFuture, FluentFutureStream::unboxFuture)
- .map(ManageableMailQueue.MailQueueItemView::new)
- .completableFuture();
+ .flatMapSequential(this::toMailFuture)
+ .map(ManageableMailQueue.MailQueueItemView::new);
}
- FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
- return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
- .thenApply(this::allSlicesStartingAt))
- .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
+ Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
+ return browseStartDao.findBrowseStart(queueName)
+ .flatMapMany(this::allSlicesStartingAt)
+ .flatMapSequential(slice -> browseSlice(queueName, slice))
+ .flatMapSequential(Flux::fromIterable)
+ .subscribeOn(Schedulers.parallel());
}
- private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+ private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
- return mimeMessageStore.read(enqueuedItem.getPartsId())
- .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+ return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
+ .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
}
private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -132,31 +131,25 @@ public class CassandraMailQueueBrowser {
return mail;
}
- private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
- return FluentFutureStream.of(
+ private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) {
+ return
allBucketIds()
- .map(bucketId ->
- browseBucket(queueName, slice, bucketId).completableFuture()),
- FluentFutureStream::unboxStream)
- .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+ .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
+ .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
}
- private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
- return FluentFutureStream.of(
- enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
- .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
+ private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
+ return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
+ .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
}
- private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {
- return maybeBrowseStart
- .map(Slice::of)
- .map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow()))
- .orElse(Stream.empty());
+ private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
+ return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow()));
}
- private Stream<BucketId> allBucketIds() {
- return IntStream
+ private Flux<BucketId> allBucketIds() {
+ return Flux
.range(0, configuration.getBucketCount())
- .mapToObj(BucketId::of);
+ .map(BucketId::of);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index b94b1ed..57732cb 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -20,10 +20,7 @@
package org.apache.james.queue.rabbitmq.view.cassandra;
import java.time.Instant;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -32,6 +29,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueMailDelete {
private final DeletedMailsDAO deletedMailsDao;
@@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete {
this.random = random;
}
- CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
+ Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
return considerDeleted(MailKey.fromMail(mail), mailQueueName);
}
- CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
+ Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
return deletedMailsDao
.markAsDeleted(mailQueueName, mailKey)
- .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
+ .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
}
- CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
+ Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail));
}
- CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) {
- return findNewBrowseStart(mailQueueName)
- .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart));
+ void updateBrowseStart(MailQueueName mailQueueName) {
+ Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName);
+ updateNewBrowseStart(mailQueueName, newBrowseStart);
}
private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
if (shouldUpdateBrowseStart()) {
- updateBrowseStart(mailQueueName).join();
+ updateBrowseStart(mailQueueName);
}
}
- private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
+ private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
- .completableFuture()
- .thenApply(Stream::findFirst);
+ .next();
}
- private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) {
+ private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) {
return maybeNewBrowseStart
- .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
- .orElse(CompletableFuture.completedFuture(null));
+ .flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant));
}
private boolean shouldUpdateBrowseStart() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index 41c28a9..1dd07f5 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import java.time.Clock;
import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -32,6 +31,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Bucke
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueMailStore {
private final EnqueuedMailsDAO enqueuedMailsDao;
@@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore {
this.clock = clock;
}
- CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+ Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);
return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
}
- CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
+ Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) {
return browseStartDao
.insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 4980f46..a0ca3cd 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -20,7 +20,6 @@
package org.apache.james.queue.rabbitmq.view.cassandra;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -33,9 +32,10 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
-import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueView implements MailQueueView {
public static class Factory implements MailQueueView.Factory {
@@ -80,62 +80,54 @@ public class CassandraMailQueueView implements MailQueueView {
@Override
public void initialize(MailQueueName mailQueueName) {
- storeHelper.initializeBrowseStart(mailQueueName)
- .join();
+ storeHelper.initializeBrowseStart(mailQueueName).block();
}
@Override
public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
- return storeHelper.storeMail(enqueuedItem);
+ return storeHelper.storeMail(enqueuedItem).toFuture();
}
@Override
public ManageableMailQueue.MailQueueIterator browse() {
return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
cassandraMailQueueBrowser.browse(mailQueueName)
- .join()
+ .toIterable()
.iterator());
}
@Override
public long getSize() {
- return cassandraMailQueueBrowser.browseReferences(mailQueueName)
- .join()
- .count();
+ return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block();
}
@Override
- public CompletableFuture<Long> delete(DeleteCondition deleteCondition) {
+ public long delete(DeleteCondition deleteCondition) {
if (deleteCondition instanceof DeleteCondition.WithName) {
DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition;
-
- return delete(MailKey.of(nameDeleteCondition.getName())).thenApply(any -> 1L);
+ return delete(MailKey.of(nameDeleteCondition.getName())).map(any -> 1L).block();
}
-
return browseThenDelete(deleteCondition);
}
- private CompletableFuture<Long> browseThenDelete(DeleteCondition deleteCondition) {
- CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName)
+ private long browseThenDelete(DeleteCondition deleteCondition) {
+ return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
.filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail()))
- .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName),
- FluentFutureStream::unboxFuture)
- .completableFuture()
- .thenApply(Stream::count);
-
- result.thenRunAsync(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
-
- return result;
+ .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName))
+ .count()
+ .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+ .block();
}
- private CompletableFuture<Void> delete(MailKey mailKey) {
+ private Mono<Void> delete(MailKey mailKey) {
return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName);
}
@Override
public CompletableFuture<Boolean> isPresent(Mail mail) {
return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName)
- .thenApply(bool -> !bool);
+ .map(bool -> !bool)
+ .toFuture();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
index c5feefc..a1986e1 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
@@ -27,8 +27,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME;
import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME;
-import java.util.concurrent.CompletableFuture;
-
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,6 +35,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class DeletedMailsDAO {
@@ -64,20 +63,21 @@ public class DeletedMailsDAO {
.and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
}
- CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
- return executor.executeVoid(insertOne.bind()
+ Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+ return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
.setString(QUEUE_NAME, mailQueueName.asString())
- .setString(MAIL_KEY, mailKey.getMailKey()));
+ .setString(MAIL_KEY, mailKey.getMailKey())));
}
- CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
- return executor.executeReturnExists(
+ Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+ return Mono.fromCompletionStage(executor.executeReturnExists(
selectOne.bind()
.setString(QUEUE_NAME, mailQueueName.asString())
- .setString(MAIL_KEY, mailKey.getMailKey()));
+ .setString(MAIL_KEY, mailKey.getMailKey())));
}
- CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
- return isDeleted(mailQueueName, mailKey).thenApply(b -> !b);
+ Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
+ return isDeleted(mailQueueName, mailKey)
+ .map(b -> !b);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index fef9ba1..eb1e9cf 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -47,8 +47,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -64,6 +62,10 @@ import org.apache.mailet.Mail;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
public class EnqueuedMailsDAO {
@@ -73,10 +75,12 @@ public class EnqueuedMailsDAO {
private final CassandraUtils cassandraUtils;
private final CassandraTypesProvider cassandraTypesProvider;
private final BlobId.Factory blobFactory;
+ private Scheduler scheduler;
@Inject
EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider,
BlobId.Factory blobIdFactory) {
+ this.scheduler = Schedulers.parallel();
this.executor = new CassandraAsyncExecutor(session);
this.cassandraUtils = cassandraUtils;
this.cassandraTypesProvider = cassandraTypesProvider;
@@ -114,13 +118,13 @@ public class EnqueuedMailsDAO {
.value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
}
- CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
+ Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem();
EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext();
Mail mail = enqueuedItem.getMail();
MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId();
- return executor.executeVoid(insert.bind()
+ return Mono.fromCompletionStage(executor.executeVoid(insert.bind()
.setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString())
.setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart()))
.setInt(BUCKET_ID, slicingContext.getBucketId().getValue())
@@ -136,19 +140,20 @@ public class EnqueuedMailsDAO {
.setString(REMOTE_HOST, mail.getRemoteHost())
.setTimestamp(LAST_UPDATED, mail.getLastUpdated())
.setMap(ATTRIBUTES, toRawAttributeMap(mail))
- .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())));
+ .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))));
}
- CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails(
+ Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(
MailQueueName queueName, Slice slice, BucketId bucketId) {
- return executor.execute(
+ return Mono.fromCompletionStage(executor.execute(
selectFrom.bind()
.setString(QUEUE_NAME, queueName.asString())
.setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
- .setInt(BUCKET_ID, bucketId.getValue()))
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)));
+ .setInt(BUCKET_ID, bucketId.getValue())))
+ .map(cassandraUtils::convertToStream)
+ .flatMapMany(Flux::fromStream)
+ .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
index d35b6d7..2f60736 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -69,17 +69,6 @@ public class BucketedSlices {
return new Slice(sliceStartInstant);
}
- public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt, Duration windowSize) {
- long sliceCount = calculateSliceCount(firstSlice, endAt, windowSize);
- long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond();
- long sliceWindowSizeInSecond = windowSize.getSeconds();
-
- return LongStream.range(0, sliceCount)
- .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
- .mapToObj(Instant::ofEpochSecond)
- .map(Slice::of);
- }
-
private static long calculateSliceCount(Slice firstSlice, Instant endAt, Duration windowSize) {
long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond();
long endAtSeconds = endAt.getEpochSecond();
@@ -104,6 +93,17 @@ public class BucketedSlices {
return startSliceInstant;
}
+ public Stream<Slice> allSlicesTill(Instant endAt, Duration windowSize) {
+ long sliceCount = calculateSliceCount(this, endAt, windowSize);
+ long startAtSeconds = this.getStartSliceInstant().getEpochSecond();
+ long sliceWindowSizeInSecond = windowSize.getSeconds();
+
+ return LongStream.range(0, sliceCount)
+ .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
+ .mapToObj(Instant::ofEpochSecond)
+ .map(Slice::of);
+ }
+
@Override
public final boolean equals(Object o) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index ef844c6..c427d44 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Instant;
-import java.util.Optional;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -31,6 +30,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+
class BrowseStartDAOTest {
private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
@@ -50,37 +51,37 @@ class BrowseStartDAOTest {
@Test
void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
- Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
- assertThat(firstEnqueuedItemFromQueue2)
+ Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+ assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
.isEmpty();
}
@Test
void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
- testee.updateBrowseStart(OUT_GOING_2, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
+ testee.updateBrowseStart(OUT_GOING_2, NOW).block();
- Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
- assertThat(firstEnqueuedItemFromQueue2)
+ Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+ assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
.isNotEmpty();
}
@Test
void updateFirstEnqueuedTimeShouldWork() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
- assertThat(testee.selectOne(OUT_GOING_1).join())
+ assertThat(testee.selectOne(OUT_GOING_1).flux().collectList().block())
.isNotEmpty();
}
@Test
void insertInitialBrowseStartShouldInsertFirstInstant() {
- testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join();
- testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join();
+ testee.insertInitialBrowseStart(OUT_GOING_1, NOW).block();
+ testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).block();
- assertThat(testee.findBrowseStart(OUT_GOING_1).join())
+ assertThat(testee.findBrowseStart(OUT_GOING_1).flux().collectList().block())
.contains(NOW);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 7eadf9c..d9a451a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,11 +25,11 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.blob.api.HashBlobId;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -38,6 +38,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueViewTestFactory {
public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
@@ -69,7 +71,9 @@ public class CassandraMailQueueViewTestFactory {
public static boolean isInitialized(Session session, MailQueueName mailQueueName) {
BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
return browseStartDao.findBrowseStart(mailQueueName)
- .thenApply(Optional::isPresent)
- .join();
+ .map(Optional::ofNullable)
+ .switchIfEmpty(Mono.just(Optional.empty()))
+ .block()
+ .isPresent();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
index d9dc69a..e21804f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
@@ -50,58 +50,58 @@ class DeletedMailsDAOTest {
void markAsDeletedShouldWork() {
Boolean isDeletedBeforeMark = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeletedBeforeMark).isFalse();
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
Boolean isDeletedAfterMark = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeletedAfterMark).isTrue();
}
@Test
void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() {
- testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join();
+ testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() {
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() {
- testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnTrueWhenTableContainsMailItem() {
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isTrue();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
index 3d44db1..13b9e00 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.SoftAssertions.assertSoftly;
import java.time.Instant;
-import java.util.stream.Stream;
+import java.util.List;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -86,11 +86,11 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
.build())
- .join();
+ .block();
- Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
+ List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
- .join();
+ .collectList().block();
assertThat(selectedEnqueuedMails).hasSize(1);
}
@@ -108,7 +108,7 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
.build())
- .join();
+ .block();
testee.insert(EnqueuedItemWithSlicingContext.builder()
.enqueuedItem(EnqueuedItem.builder()
@@ -121,10 +121,10 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW))
.build())
- .join();
+ .block();
- Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
- .join();
+ List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+ .collectList().block();
assertThat(selectedEnqueuedMails)
.hasSize(1)
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
index 842216c..1fb6916 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -64,13 +64,13 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
- assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
+ assertThat(FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
.containsOnly(FIRST_SLICE);
}
@Test
void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
- Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlices = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
assertThat(allSlices)
.containsExactly(
@@ -81,9 +81,9 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() {
- Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
- Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
- Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheStartOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheMiddleOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheEndWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
Slice [] allSlicesInThreeHours = {
FIRST_SLICE,
@@ -102,7 +102,7 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() {
- Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlices = FIRST_SLICE_NEXT_TWO_HOUR.allSlicesTill(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
assertThat(allSlices).isEmpty();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/3] james-project git commit: JAMES-2619 remove code in
NamedThreadFactory to rely on Guava
Posted by bt...@apache.org.
JAMES-2619 remove code in NamedThreadFactory to rely on Guava
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b0ebfb66
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b0ebfb66
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b0ebfb66
Branch: refs/heads/master
Commit: b0ebfb66209524cc2406ea508443a1a720da5a4a
Parents: 16e4d9f
Author: Matthieu Baechler <ma...@apache.org>
Authored: Wed Dec 5 10:49:34 2018 +0100
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:03:57 2018 +0700
----------------------------------------------------------------------
.../JMXEnabledScheduledThreadPoolExecutor.java | 2 +-
.../JMXEnabledThreadPoolExecutor.java | 4 +--
.../util/concurrent/NamedThreadFactory.java | 33 +++++++++-----------
...ledOrderedMemoryAwareThreadPoolExecutor.java | 4 +--
4 files changed, 19 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
index 9c3f499..a51e152 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
@@ -50,7 +50,7 @@ public class JMXEnabledScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
}
public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, String jmxPath, String name) {
- super(corePoolSize, new NamedThreadFactory(name));
+ super(corePoolSize, NamedThreadFactory.withName(name));
this.jmxPath = jmxPath;
registerMBean();
http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
index 2a42464..c80112a 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -153,7 +153,7 @@ public class JMXEnabledThreadPoolExecutor extends ThreadPoolExecutor implements
*
*/
public static JMXEnabledThreadPoolExecutor newCachedThreadPool(String jmxPath, String name) {
- return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory(name), jmxPath);
+ return new JMXEnabledThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), NamedThreadFactory.withName(name), jmxPath);
}
@@ -174,6 +174,6 @@ public class JMXEnabledThreadPoolExecutor extends ThreadPoolExecutor implements
}
public static JMXEnabledThreadPoolExecutor newFixedThreadPool(String jmxPath, String name, int nThreads) {
- return newFixedThreadPool(jmxPath, nThreads, new NamedThreadFactory(name));
+ return newFixedThreadPool(jmxPath, nThreads, NamedThreadFactory.withName(name));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java b/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
index 8aeaaf5..f982349 100644
--- a/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
+++ b/server/container/util/src/main/java/org/apache/james/util/concurrent/NamedThreadFactory.java
@@ -19,35 +19,30 @@
package org.apache.james.util.concurrent;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
-/**
- * ThreadPool which use name and a counter for thread names
- */
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class NamedThreadFactory implements ThreadFactory {
public final String name;
- private final AtomicLong count = new AtomicLong();
- private final int priority;
+ private final ThreadFactory threadFactory;
- public NamedThreadFactory(String name, int priority) {
- if (priority > Thread.MAX_PRIORITY || priority < Thread.MIN_PRIORITY) {
- throw new IllegalArgumentException("Priority must be <= " + Thread.MAX_PRIORITY + " and >=" + Thread.MIN_PRIORITY);
- }
- this.name = name;
- this.priority = priority;
+ public static NamedThreadFactory withClassName(Class<?> clazz) {
+ return new NamedThreadFactory(clazz.getName());
}
- public NamedThreadFactory(String name) {
- this(name, Thread.NORM_PRIORITY);
+ public static NamedThreadFactory withName(String name) {
+ return new NamedThreadFactory(name);
+ }
+
+ private NamedThreadFactory(String name) {
+ this.name = name;
+ this.threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-%d").build();
}
@Override
public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName(name + "-" + count.incrementAndGet());
- t.setPriority(priority);
- return t;
+ return threadFactory.newThread(r);
}
public String getName() {
@@ -56,7 +51,7 @@ public class NamedThreadFactory implements ThreadFactory {
@Override
public String toString() {
- return "NamedTreadFactory: " + getName();
+ return "NamedThreadFactory: " + getName();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b0ebfb66/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
index e59a8fb..f0d64df 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/JMXEnabledOrderedMemoryAwareThreadPoolExecutor.java
@@ -44,13 +44,13 @@ public class JMXEnabledOrderedMemoryAwareThreadPoolExecutor extends OrderedMemor
private String mbeanName;
public JMXEnabledOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, String jmxPath, String name) {
- super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS, new NamedThreadFactory(name));
+ super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS, NamedThreadFactory.withName(name));
this.jmxPath = jmxPath;
registerMBean();
}
public JMXEnabledOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, String jmxPath, String name) {
- super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new NamedThreadFactory(name));
+ super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, NamedThreadFactory.withName(name));
this.jmxPath = jmxPath;
registerMBean();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org