You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/12/06 08:05:40 UTC
[3/3] james-project git commit: JAMES-2550 Mailqueue with reactor
JAMES-2550 Mailqueue with reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f121dd8d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f121dd8d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f121dd8d
Branch: refs/heads/master
Commit: f121dd8d413321db9a85a5cce42b572da392c2b5
Parents: e96b5cb
Author: Benoit Tellier <bt...@linagora.com>
Authored: Tue Dec 4 16:10:33 2018 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Dec 6 15:04:29 2018 +0700
----------------------------------------------------------------------
pom.xml | 7 +++
server/queue/queue-rabbitmq/pom.xml | 9 +++
.../apache/james/queue/rabbitmq/Dequeuer.java | 2 +-
.../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +-
.../queue/rabbitmq/view/api/MailQueueView.java | 2 +-
.../rabbitmq/view/cassandra/BrowseStartDAO.java | 26 ++++----
.../cassandra/CassandraMailQueueBrowser.java | 63 +++++++++-----------
.../cassandra/CassandraMailQueueMailDelete.java | 31 +++++-----
.../cassandra/CassandraMailQueueMailStore.java | 7 ++-
.../view/cassandra/CassandraMailQueueView.java | 42 ++++++-------
.../view/cassandra/DeletedMailsDAO.java | 20 +++----
.../view/cassandra/EnqueuedMailsDAO.java | 25 ++++----
.../view/cassandra/model/BucketedSlices.java | 22 +++----
.../view/cassandra/BrowseStartDAOTest.java | 27 +++++----
.../CassandraMailQueueViewTestFactory.java | 10 +++-
.../view/cassandra/DeletedMailsDAOTest.java | 22 +++----
.../view/cassandra/EnqueuedMailsDaoTest.java | 16 ++---
.../cassandra/model/BucketedSlicesTest.java | 12 ++--
18 files changed, 178 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 300a18b..c4d85bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -666,6 +666,13 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-bom</artifactId>
+ <version>Bismuth-RELEASE</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-cassandra</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 14564ff..ab4ffff 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -171,6 +171,15 @@
<version>${feign.version}</version>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 6caef63..76e9838 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -103,7 +103,7 @@ class Dequeuer {
if (success) {
dequeueMetric.increment();
rabbitClient.ack(deliveryTag);
- mailQueueView.delete(DeleteCondition.withName(mail.getName())).join();
+ mailQueueView.delete(DeleteCondition.withName(mail.getName()));
} else {
rabbitClient.nack(deliveryTag);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 0909469..1873313 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -93,12 +93,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
@Override
public long clear() {
- return mailQueueView.delete(DeleteCondition.all()).join();
+ return mailQueueView.delete(DeleteCondition.all());
}
@Override
public long remove(Type type, String value) {
- return mailQueueView.delete(DeleteCondition.from(type, value)).join();
+ return mailQueueView.delete(DeleteCondition.from(type, value));
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index a291d61..12ca723 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -36,7 +36,7 @@ public interface MailQueueView {
CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem);
- CompletableFuture<Long> delete(DeleteCondition deleteCondition);
+ long delete(DeleteCondition deleteCondition);
CompletableFuture<Boolean> isPresent(Mail mail);
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 4425c46..9552f5c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,8 +31,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
import java.time.Instant;
import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -43,6 +41,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
public class BrowseStartDAO {
@@ -79,28 +78,29 @@ public class BrowseStartDAO {
.value(QUEUE_NAME, bindMarker(QUEUE_NAME)));
}
- CompletableFuture<Optional<Instant>> findBrowseStart(MailQueueName queueName) {
+ Mono<Instant> findBrowseStart(MailQueueName queueName) {
return selectOne(queueName)
- .thenApply(optional -> optional.map(this::getBrowseStart));
+ .map(this::getBrowseStart);
}
- CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
- return executor.executeVoid(updateOne.bind()
+ Mono<Void> updateBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+ return Mono.fromCompletionStage(executor.executeVoid(updateOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
- .setString(QUEUE_NAME, mailQueueName.asString()));
+ .setString(QUEUE_NAME, mailQueueName.asString())));
}
- CompletableFuture<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
- return executor.executeVoid(insertOne.bind()
+ Mono<Void> insertInitialBrowseStart(MailQueueName mailQueueName, Instant sliceStart) {
+ return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
.setTimestamp(BROWSE_START, Date.from(sliceStart))
- .setString(QUEUE_NAME, mailQueueName.asString()));
+ .setString(QUEUE_NAME, mailQueueName.asString())));
}
@VisibleForTesting
- CompletableFuture<Optional<Row>> selectOne(MailQueueName queueName) {
- return executor.executeSingleRow(
+ Mono<Row> selectOne(MailQueueName queueName) {
+ return Mono.fromCompletionStage(executor.executeSingleRow(
selectOne.bind()
- .setString(QUEUE_NAME, queueName.asString()));
+ .setString(QUEUE_NAME, queueName.asString())))
+ .flatMap(Mono::justOrEmpty);
}
private Instant getBrowseStart(Row row) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index a5a9bb9..4279c15 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -21,16 +21,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId;
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
-import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice.allSlicesTill;
import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
+import java.util.List;
import javax.inject.Inject;
import javax.mail.MessagingException;
@@ -44,12 +40,14 @@ import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
-import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class CassandraMailQueueBrowser {
@@ -101,23 +99,24 @@ public class CassandraMailQueueBrowser {
this.clock = clock;
}
- CompletableFuture<Stream<ManageableMailQueue.MailQueueItemView>> browse(MailQueueName queueName) {
+ Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
return browseReferences(queueName)
- .map(this::toMailFuture, FluentFutureStream::unboxFuture)
- .map(ManageableMailQueue.MailQueueItemView::new)
- .completableFuture();
+ .flatMapSequential(this::toMailFuture)
+ .map(ManageableMailQueue.MailQueueItemView::new);
}
- FluentFutureStream<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
- return FluentFutureStream.of(browseStartDao.findBrowseStart(queueName)
- .thenApply(this::allSlicesStartingAt))
- .map(slice -> browseSlice(queueName, slice), FluentFutureStream::unboxFluentFuture);
+ Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
+ return browseStartDao.findBrowseStart(queueName)
+ .flatMapMany(this::allSlicesStartingAt)
+ .flatMapSequential(slice -> browseSlice(queueName, slice))
+ .flatMapSequential(Flux::fromIterable)
+ .subscribeOn(Schedulers.parallel());
}
- private CompletableFuture<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+ private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
- return mimeMessageStore.read(enqueuedItem.getPartsId())
- .thenApply(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+ return Mono.fromCompletionStage(mimeMessageStore.read(enqueuedItem.getPartsId()))
+ .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
}
private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -132,31 +131,25 @@ public class CassandraMailQueueBrowser {
return mail;
}
- private FluentFutureStream<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) {
- return FluentFutureStream.of(
+ private Mono<List<EnqueuedItemWithSlicingContext>> browseSlice(MailQueueName queueName, Slice slice) {
+ return
allBucketIds()
- .map(bucketId ->
- browseBucket(queueName, slice, bucketId).completableFuture()),
- FluentFutureStream::unboxStream)
- .sorted(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+ .flatMap(bucketId -> browseBucket(queueName, slice, bucketId))
+ .collectSortedList(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
}
- private FluentFutureStream<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
- return FluentFutureStream.of(
- enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId))
- .thenFilter(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
+ private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) {
+ return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
+ .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey()));
}
- private Stream<Slice> allSlicesStartingAt(Optional<Instant> maybeBrowseStart) {
- return maybeBrowseStart
- .map(Slice::of)
- .map(startSlice -> allSlicesTill(startSlice, clock.instant(), configuration.getSliceWindow()))
- .orElse(Stream.empty());
+ private Flux<Slice> allSlicesStartingAt(Instant browseStart) {
+ return Flux.fromStream(Slice.of(browseStart).allSlicesTill(clock.instant(), configuration.getSliceWindow()));
}
- private Stream<BucketId> allBucketIds() {
- return IntStream
+ private Flux<BucketId> allBucketIds() {
+ return Flux
.range(0, configuration.getBucketCount())
- .mapToObj(BucketId::of);
+ .map(BucketId::of);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index b94b1ed..57732cb 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -20,10 +20,7 @@
package org.apache.james.queue.rabbitmq.view.cassandra;
import java.time.Instant;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -32,6 +29,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueMailDelete {
private final DeletedMailsDAO deletedMailsDao;
@@ -53,42 +52,40 @@ public class CassandraMailQueueMailDelete {
this.random = random;
}
- CompletableFuture<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
+ Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) {
return considerDeleted(MailKey.fromMail(mail), mailQueueName);
}
- CompletableFuture<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
+ Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) {
return deletedMailsDao
.markAsDeleted(mailQueueName, mailKey)
- .thenRunAsync(() -> maybeUpdateBrowseStart(mailQueueName));
+ .doOnTerminate(() -> maybeUpdateBrowseStart(mailQueueName));
}
- CompletableFuture<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
+ Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) {
return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail));
}
- CompletableFuture<Void> updateBrowseStart(MailQueueName mailQueueName) {
- return findNewBrowseStart(mailQueueName)
- .thenCompose(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart));
+ void updateBrowseStart(MailQueueName mailQueueName) {
+ Mono<Instant> newBrowseStart = findNewBrowseStart(mailQueueName);
+ updateNewBrowseStart(mailQueueName, newBrowseStart);
}
private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
if (shouldUpdateBrowseStart()) {
- updateBrowseStart(mailQueueName).join();
+ updateBrowseStart(mailQueueName);
}
}
- private CompletableFuture<Optional<Instant>> findNewBrowseStart(MailQueueName mailQueueName) {
+ private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
- .completableFuture()
- .thenApply(Stream::findFirst);
+ .next();
}
- private CompletableFuture<Void> updateNewBrowseStart(MailQueueName mailQueueName, Optional<Instant> maybeNewBrowseStart) {
+ private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Mono<Instant> maybeNewBrowseStart) {
return maybeNewBrowseStart
- .map(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant))
- .orElse(CompletableFuture.completedFuture(null));
+ .flatMap(newBrowseStartInstant -> browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant));
}
private boolean shouldUpdateBrowseStart() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index 41c28a9..1dd07f5 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -21,7 +21,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import java.time.Clock;
import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -32,6 +31,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Bucke
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueMailStore {
private final EnqueuedMailsDAO enqueuedMailsDao;
@@ -50,13 +51,13 @@ public class CassandraMailQueueMailStore {
this.clock = clock;
}
- CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
+ Mono<Void> storeMail(EnqueuedItem enqueuedItem) {
EnqueuedItemWithSlicingContext enqueuedItemAndSlicing = addSliceContext(enqueuedItem);
return enqueuedMailsDao.insert(enqueuedItemAndSlicing);
}
- CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) {
+ Mono<Void> initializeBrowseStart(MailQueueName mailQueueName) {
return browseStartDao
.insertInitialBrowseStart(mailQueueName, currentSliceStartInstant());
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 4980f46..a0ca3cd 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -20,7 +20,6 @@
package org.apache.james.queue.rabbitmq.view.cassandra;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -33,9 +32,10 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
-import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueView implements MailQueueView {
public static class Factory implements MailQueueView.Factory {
@@ -80,62 +80,54 @@ public class CassandraMailQueueView implements MailQueueView {
@Override
public void initialize(MailQueueName mailQueueName) {
- storeHelper.initializeBrowseStart(mailQueueName)
- .join();
+ storeHelper.initializeBrowseStart(mailQueueName).block();
}
@Override
public CompletableFuture<Void> storeMail(EnqueuedItem enqueuedItem) {
- return storeHelper.storeMail(enqueuedItem);
+ return storeHelper.storeMail(enqueuedItem).toFuture();
}
@Override
public ManageableMailQueue.MailQueueIterator browse() {
return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
cassandraMailQueueBrowser.browse(mailQueueName)
- .join()
+ .toIterable()
.iterator());
}
@Override
public long getSize() {
- return cassandraMailQueueBrowser.browseReferences(mailQueueName)
- .join()
- .count();
+ return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block();
}
@Override
- public CompletableFuture<Long> delete(DeleteCondition deleteCondition) {
+ public long delete(DeleteCondition deleteCondition) {
if (deleteCondition instanceof DeleteCondition.WithName) {
DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition;
-
- return delete(MailKey.of(nameDeleteCondition.getName())).thenApply(any -> 1L);
+ return delete(MailKey.of(nameDeleteCondition.getName())).map(any -> 1L).block();
}
-
return browseThenDelete(deleteCondition);
}
- private CompletableFuture<Long> browseThenDelete(DeleteCondition deleteCondition) {
- CompletableFuture<Long> result = cassandraMailQueueBrowser.browseReferences(mailQueueName)
+ private long browseThenDelete(DeleteCondition deleteCondition) {
+ return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
.filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail()))
- .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName),
- FluentFutureStream::unboxFuture)
- .completableFuture()
- .thenApply(Stream::count);
-
- result.thenRunAsync(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
-
- return result;
+ .map(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName))
+ .count()
+ .doOnTerminate(() -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
+ .block();
}
- private CompletableFuture<Void> delete(MailKey mailKey) {
+ private Mono<Void> delete(MailKey mailKey) {
return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName);
}
@Override
public CompletableFuture<Boolean> isPresent(Mail mail) {
return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName)
- .thenApply(bool -> !bool);
+ .map(bool -> !bool)
+ .toFuture();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
index c5feefc..a1986e1 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java
@@ -27,8 +27,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV
import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME;
import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME;
-import java.util.concurrent.CompletableFuture;
-
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,6 +35,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class DeletedMailsDAO {
@@ -64,20 +63,21 @@ public class DeletedMailsDAO {
.and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
}
- CompletableFuture<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
- return executor.executeVoid(insertOne.bind()
+ Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+ return Mono.fromCompletionStage(executor.executeVoid(insertOne.bind()
.setString(QUEUE_NAME, mailQueueName.asString())
- .setString(MAIL_KEY, mailKey.getMailKey()));
+ .setString(MAIL_KEY, mailKey.getMailKey())));
}
- CompletableFuture<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
- return executor.executeReturnExists(
+ Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) {
+ return Mono.fromCompletionStage(executor.executeReturnExists(
selectOne.bind()
.setString(QUEUE_NAME, mailQueueName.asString())
- .setString(MAIL_KEY, mailKey.getMailKey()));
+ .setString(MAIL_KEY, mailKey.getMailKey())));
}
- CompletableFuture<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
- return isDeleted(mailQueueName, mailKey).thenApply(b -> !b);
+ Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) {
+ return isDeleted(mailQueueName, mailKey)
+ .map(b -> !b);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
index fef9ba1..eb1e9cf 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java
@@ -47,8 +47,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -64,6 +62,10 @@ import org.apache.mailet.Mail;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
public class EnqueuedMailsDAO {
@@ -73,10 +75,12 @@ public class EnqueuedMailsDAO {
private final CassandraUtils cassandraUtils;
private final CassandraTypesProvider cassandraTypesProvider;
private final BlobId.Factory blobFactory;
+ private Scheduler scheduler;
@Inject
EnqueuedMailsDAO(Session session, CassandraUtils cassandraUtils, CassandraTypesProvider cassandraTypesProvider,
BlobId.Factory blobIdFactory) {
+ this.scheduler = Schedulers.parallel();
this.executor = new CassandraAsyncExecutor(session);
this.cassandraUtils = cassandraUtils;
this.cassandraTypesProvider = cassandraTypesProvider;
@@ -114,13 +118,13 @@ public class EnqueuedMailsDAO {
.value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
}
- CompletableFuture<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
+ Mono<Void> insert(EnqueuedItemWithSlicingContext enqueuedItemWithSlicing) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicing.getEnqueuedItem();
EnqueuedItemWithSlicingContext.SlicingContext slicingContext = enqueuedItemWithSlicing.getSlicingContext();
Mail mail = enqueuedItem.getMail();
MimeMessagePartsId mimeMessagePartsId = enqueuedItem.getPartsId();
- return executor.executeVoid(insert.bind()
+ return Mono.fromCompletionStage(executor.executeVoid(insert.bind()
.setString(QUEUE_NAME, enqueuedItem.getMailQueueName().asString())
.setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart()))
.setInt(BUCKET_ID, slicingContext.getBucketId().getValue())
@@ -136,19 +140,20 @@ public class EnqueuedMailsDAO {
.setString(REMOTE_HOST, mail.getRemoteHost())
.setTimestamp(LAST_UPDATED, mail.getLastUpdated())
.setMap(ATTRIBUTES, toRawAttributeMap(mail))
- .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders())));
+ .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))));
}
- CompletableFuture<Stream<EnqueuedItemWithSlicingContext>> selectEnqueuedMails(
+ Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails(
MailQueueName queueName, Slice slice, BucketId bucketId) {
- return executor.execute(
+ return Mono.fromCompletionStage(executor.execute(
selectFrom.bind()
.setString(QUEUE_NAME, queueName.asString())
.setTimestamp(TIME_RANGE_START, Date.from(slice.getStartSliceInstant()))
- .setInt(BUCKET_ID, bucketId.getValue()))
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory)));
+ .setInt(BUCKET_ID, bucketId.getValue())))
+ .map(cassandraUtils::convertToStream)
+ .flatMapMany(Flux::fromStream)
+ .map(row -> EnqueuedMailsDaoUtil.toEnqueuedMail(row, blobFactory));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
index d35b6d7..2f60736 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -69,17 +69,6 @@ public class BucketedSlices {
return new Slice(sliceStartInstant);
}
- public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt, Duration windowSize) {
- long sliceCount = calculateSliceCount(firstSlice, endAt, windowSize);
- long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond();
- long sliceWindowSizeInSecond = windowSize.getSeconds();
-
- return LongStream.range(0, sliceCount)
- .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
- .mapToObj(Instant::ofEpochSecond)
- .map(Slice::of);
- }
-
private static long calculateSliceCount(Slice firstSlice, Instant endAt, Duration windowSize) {
long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond();
long endAtSeconds = endAt.getEpochSecond();
@@ -104,6 +93,17 @@ public class BucketedSlices {
return startSliceInstant;
}
+ public Stream<Slice> allSlicesTill(Instant endAt, Duration windowSize) {
+ long sliceCount = calculateSliceCount(this, endAt, windowSize);
+ long startAtSeconds = this.getStartSliceInstant().getEpochSecond();
+ long sliceWindowSizeInSecond = windowSize.getSeconds();
+
+ return LongStream.range(0, sliceCount)
+ .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition)
+ .mapToObj(Instant::ofEpochSecond)
+ .map(Slice::of);
+ }
+
@Override
public final boolean equals(Object o) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index ef844c6..c427d44 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Instant;
-import java.util.Optional;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -31,6 +30,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+
class BrowseStartDAOTest {
private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1");
@@ -50,37 +51,37 @@ class BrowseStartDAOTest {
@Test
void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
- Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
- assertThat(firstEnqueuedItemFromQueue2)
+ Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+ assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
.isEmpty();
}
@Test
void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
- testee.updateBrowseStart(OUT_GOING_2, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
+ testee.updateBrowseStart(OUT_GOING_2, NOW).block();
- Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join();
- assertThat(firstEnqueuedItemFromQueue2)
+ Mono<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2);
+ assertThat(firstEnqueuedItemFromQueue2.flux().collectList().block())
.isNotEmpty();
}
@Test
void updateFirstEnqueuedTimeShouldWork() {
- testee.updateBrowseStart(OUT_GOING_1, NOW).join();
+ testee.updateBrowseStart(OUT_GOING_1, NOW).block();
- assertThat(testee.selectOne(OUT_GOING_1).join())
+ assertThat(testee.selectOne(OUT_GOING_1).flux().collectList().block())
.isNotEmpty();
}
@Test
void insertInitialBrowseStartShouldInsertFirstInstant() {
- testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join();
- testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join();
+ testee.insertInitialBrowseStart(OUT_GOING_1, NOW).block();
+ testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).block();
- assertThat(testee.findBrowseStart(OUT_GOING_1).join())
+ assertThat(testee.findBrowseStart(OUT_GOING_1).flux().collectList().block())
.contains(NOW);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 7eadf9c..d9a451a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,11 +25,11 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.blob.api.HashBlobId;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -38,6 +38,8 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
public class CassandraMailQueueViewTestFactory {
public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session,
@@ -69,7 +71,9 @@ public class CassandraMailQueueViewTestFactory {
public static boolean isInitialized(Session session, MailQueueName mailQueueName) {
BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
return browseStartDao.findBrowseStart(mailQueueName)
- .thenApply(Optional::isPresent)
- .join();
+ .map(Optional::ofNullable)
+ .switchIfEmpty(Mono.just(Optional.empty()))
+ .block()
+ .isPresent();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
index d9dc69a..e21804f 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java
@@ -50,58 +50,58 @@ class DeletedMailsDAOTest {
void markAsDeletedShouldWork() {
Boolean isDeletedBeforeMark = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeletedBeforeMark).isFalse();
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
Boolean isDeletedAfterMark = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeletedAfterMark).isTrue();
}
@Test
void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() {
- testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join();
+ testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() {
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() {
- testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isFalse();
}
@Test
void checkDeletedShouldReturnTrueWhenTableContainsMailItem() {
- testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join();
+ testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block();
Boolean isDeleted = testee
.isDeleted(OUT_GOING_1, MAIL_KEY_1)
- .join();
+ .block();
assertThat(isDeleted).isTrue();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
index 3d44db1..13b9e00 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java
@@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.SoftAssertions.assertSoftly;
import java.time.Instant;
-import java.util.stream.Stream;
+import java.util.List;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -86,11 +86,11 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
.build())
- .join();
+ .block();
- Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
+ List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee
.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
- .join();
+ .collectList().block();
assertThat(selectedEnqueuedMails).hasSize(1);
}
@@ -108,7 +108,7 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE), NOW))
.build())
- .join();
+ .block();
testee.insert(EnqueuedItemWithSlicingContext.builder()
.enqueuedItem(EnqueuedItem.builder()
@@ -121,10 +121,10 @@ class EnqueuedMailsDaoTest {
.build())
.slicingContext(EnqueuedItemWithSlicingContext.SlicingContext.of(BucketId.of(BUCKET_ID_VALUE + 1), NOW))
.build())
- .join();
+ .block();
- Stream<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
- .join();
+ List<EnqueuedItemWithSlicingContext> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID)
+ .collectList().block();
assertThat(selectedEnqueuedMails)
.hasSize(1)
http://git-wip-us.apache.org/repos/asf/james-project/blob/f121dd8d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
index 842216c..1fb6916 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -64,13 +64,13 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
- assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
+ assertThat(FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW))
.containsOnly(FIRST_SLICE);
}
@Test
void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
- Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlices = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
assertThat(allSlices)
.containsExactly(
@@ -81,9 +81,9 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() {
- Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
- Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
- Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheStartOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheMiddleOfWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000), ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlicesEndAtTheEndWindow = FIRST_SLICE.allSlicesTill(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1), ONE_HOUR_SLICE_WINDOW);
Slice [] allSlicesInThreeHours = {
FIRST_SLICE,
@@ -102,7 +102,7 @@ class BucketedSlicesTest {
@Test
void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() {
- Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
+ Stream<Slice> allSlices = FIRST_SLICE_NEXT_TWO_HOUR.allSlicesTill(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW);
assertThat(allSlices).isEmpty();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org