You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:22 UTC
[07/12] james-project git commit: JAMES-2630 Migrate
CassandraAsyncExecutor.executeReturnApplied consumers to Reactor
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
index 2fd0426..df5dccc 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAOTest.java
@@ -49,15 +49,15 @@ class CassandraActiveScriptDAOTest {
@Test
void getActiveSctiptInfoShouldReturnEmptyByDefault() {
- assertThat(activeScriptDAO.getActiveSctiptInfo(USER).join().isPresent())
+ assertThat(activeScriptDAO.getActiveSctiptInfo(USER).blockOptional().isPresent())
.isFalse();
}
@Test
void getActiveSctiptInfoShouldReturnStoredName() {
- activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+ activeScriptDAO.activate(USER, SCRIPT_NAME).block();
- Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+ Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
assertThat(actual.isPresent()).isTrue();
assertThat(actual.get().getName()).isEqualTo(SCRIPT_NAME);
@@ -65,30 +65,30 @@ class CassandraActiveScriptDAOTest {
@Test
void activateShouldAllowRename() {
- activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+ activeScriptDAO.activate(USER, SCRIPT_NAME).block();
- activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).join();
+ activeScriptDAO.activate(USER, NEW_SCRIPT_NAME).block();
- Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+ Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
assertThat(actual.isPresent()).isTrue();
assertThat(actual.get().getName()).isEqualTo(NEW_SCRIPT_NAME);
}
@Test
void unactivateShouldAllowRemovingActiveScript() {
- activeScriptDAO.activate(USER, SCRIPT_NAME).join();
+ activeScriptDAO.activate(USER, SCRIPT_NAME).block();
- activeScriptDAO.unactivate(USER).join();
+ activeScriptDAO.unactivate(USER).block();
- Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+ Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
assertThat(actual.isPresent()).isFalse();
}
@Test
void unactivateShouldWorkWhenNoneStore() {
- activeScriptDAO.unactivate(USER).join();
+ activeScriptDAO.unactivate(USER).block();
- Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).join();
+ Optional<ActiveScriptInfo> actual = activeScriptDAO.getActiveSctiptInfo(USER).blockOptional();
assertThat(actual.isPresent()).isFalse();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
index 5d38f61..5e3150d 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveDAOTest.java
@@ -70,54 +70,54 @@ class CassandraSieveDAOTest {
@Test
void getScriptShouldReturnEmptyByDefault() {
- assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).join().isPresent())
- .isFalse();
+ assertThat(sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional())
+ .isEmpty();
}
@Test
void getScriptShouldReturnStoredScript() {
- sieveDAO.insertScript(USER, SCRIPT).join();
+ sieveDAO.insertScript(USER, SCRIPT).block();
- Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+ Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
assertThat(actual).contains(SCRIPT);
}
@Test
void insertScriptShouldUpdateContent() {
- sieveDAO.insertScript(USER, SCRIPT).join();
+ sieveDAO.insertScript(USER, SCRIPT).block();
- sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).join();
+ sieveDAO.insertScript(USER, SCRIPT_NEW_CONTENT).block();
- Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+ Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
assertThat(actual).contains(SCRIPT_NEW_CONTENT);
}
@Test
void insertScriptShouldUpdateActivate() {
- sieveDAO.insertScript(USER, SCRIPT).join();
+ sieveDAO.insertScript(USER, SCRIPT).block();
- sieveDAO.insertScript(USER, ACTIVE_SCRIPT).join();
+ sieveDAO.insertScript(USER, ACTIVE_SCRIPT).block();
- Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+ Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
assertThat(actual).contains(ACTIVE_SCRIPT);
}
@Test
void deleteScriptInCassandraShouldWork() {
- sieveDAO.insertScript(USER, SCRIPT).join();
+ sieveDAO.insertScript(USER, SCRIPT).block();
- sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join();
+ sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block();
- Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+ Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
assertThat(actual).isEmpty();
}
@Test
void deleteScriptInCassandraShouldWorkWhenNoneStore() {
- sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).join();
+ sieveDAO.deleteScriptInCassandra(USER, SCRIPT_NAME).block();
- Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).join();
+ Optional<Script> actual = sieveDAO.getScript(USER, SCRIPT_NAME).blockOptional();
assertThat(actual).isEmpty();
}
@@ -130,8 +130,8 @@ class CassandraSieveDAOTest {
@Test
void listScriptsShouldReturnSingleStoredValue() {
- sieveDAO.insertScript(USER, SCRIPT).join();
- sieveDAO.insertScript(USER, SCRIPT2).join();
+ sieveDAO.insertScript(USER, SCRIPT).block();
+ sieveDAO.insertScript(USER, SCRIPT2).block();
List<ScriptSummary> scriptSummaryList = sieveDAO.listScripts(USER).join();
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
index adca5b0..7cacb55 100644
--- a/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
+++ b/server/data/data-cassandra/src/test/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAOTest.java
@@ -116,7 +116,7 @@ class CassandraSieveQuotaDAOTest {
void spaceUsedByShouldReturnStoredValue() {
long spaceUsed = 18L;
- sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
+ sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(spaceUsed);
}
@@ -125,8 +125,8 @@ class CassandraSieveQuotaDAOTest {
void updateSpaceUsedShouldBeAdditive() {
long spaceUsed = 18L;
- sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
- sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
+ sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
+ sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(2 * spaceUsed);
}
@@ -135,8 +135,8 @@ class CassandraSieveQuotaDAOTest {
void updateSpaceUsedShouldWorkWithNegativeValues() {
long spaceUsed = 18L;
- sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).join();
- sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).join();
+ sieveQuotaDAO.updateSpaceUsed(USER, spaceUsed).block();
+ sieveQuotaDAO.updateSpaceUsed(USER, -1 * spaceUsed).block();
assertThat(sieveQuotaDAO.spaceUsedBy(USER).join()).isEqualTo(0L);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 6655798..6f005e8 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -22,7 +22,6 @@ package org.apache.james.mailrepository.cassandra;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@@ -33,10 +32,11 @@ import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailRepository implements MailRepository {
@@ -60,28 +60,28 @@ public class CassandraMailRepository implements MailRepository {
public MailKey store(Mail mail) throws MessagingException {
MailKey mailKey = MailKey.forMail(mail);
- mimeMessageStore.save(mail.getMessage())
+ Mono.fromFuture(mimeMessageStore.save(mail.getMessage())
.thenCompose(Throwing.function(parts -> mailDAO.store(url, mail,
parts.getHeaderBlobId(),
- parts.getBodyBlobId())))
- .thenCompose(any -> keysDAO.store(url, mailKey))
- .thenCompose(this::increaseSizeIfStored)
- .join();
+ parts.getBodyBlobId()))))
+ .then(keysDAO.store(url, mailKey))
+ .flatMap(this::increaseSizeIfStored)
+ .block();
return mailKey;
}
- private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) {
+ private Mono<Void> increaseSizeIfStored(Boolean isStored) {
if (isStored) {
return countDAO.increment(url);
}
- return CompletableFuture.completedFuture(null);
+ return Mono.empty();
}
@Override
public Iterator<MailKey> list() {
return keysDAO.list(url)
- .join()
+ .toIterable()
.iterator();
}
@@ -108,33 +108,34 @@ public class CassandraMailRepository implements MailRepository {
@Override
public void remove(Mail mail) {
- removeAsync(MailKey.forMail(mail)).join();
+ removeAsync(MailKey.forMail(mail)).block();
}
@Override
public void remove(Collection<Mail> toRemove) {
- FluentFutureStream.of(toRemove.stream()
+ Flux.fromIterable(toRemove)
.map(MailKey::forMail)
- .map(this::removeAsync))
- .join();
+ .flatMap(this::removeAsync)
+ .then()
+ .block();
}
@Override
public void remove(MailKey key) {
- removeAsync(key).join();
+ removeAsync(key).block();
}
- private CompletableFuture<Void> removeAsync(MailKey key) {
+ private Mono<Void> removeAsync(MailKey key) {
return keysDAO.remove(url, key)
- .thenCompose(this::decreaseSizeIfDeleted)
- .thenCompose(any -> mailDAO.remove(url, key));
+ .flatMap(this::decreaseSizeIfDeleted)
+ .then(mailDAO.remove(url, key));
}
- private CompletionStage<Void> decreaseSizeIfDeleted(Boolean isDeleted) {
+ private Mono<Void> decreaseSizeIfDeleted(Boolean isDeleted) {
if (isDeleted) {
return countDAO.decrement(url);
}
- return CompletableFuture.completedFuture(null);
+ return Mono.empty();
}
@Override
@@ -145,9 +146,9 @@ public class CassandraMailRepository implements MailRepository {
@Override
public void removeAll() {
keysDAO.list(url)
- .thenCompose(stream -> FluentFutureStream.of(stream.map(this::removeAsync))
- .completableFuture())
- .join();
+ .flatMap(this::removeAsync)
+ .then()
+ .block();
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
index cc2d4ea..540aa1a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAO.java
@@ -40,6 +40,7 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class CassandraMailRepositoryCountDAO {
@@ -75,13 +76,13 @@ public class CassandraMailRepositoryCountDAO {
.where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME))));
}
- public CompletableFuture<Void> increment(MailRepositoryUrl url) {
- return executor.executeVoid(increment.bind()
+ public Mono<Void> increment(MailRepositoryUrl url) {
+ return executor.executeVoidReactor(increment.bind()
.setString(REPOSITORY_NAME, url.asString()));
}
- public CompletableFuture<Void> decrement(MailRepositoryUrl url) {
- return executor.executeVoid(decrement.bind()
+ public Mono<Void> decrement(MailRepositoryUrl url) {
+ return executor.executeVoidReactor(decrement.bind()
.setString(REPOSITORY_NAME, url.asString()));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
index 93bffec..8bd5902 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAO.java
@@ -28,9 +28,6 @@ import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.KEYS
import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY;
import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -40,6 +37,8 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailRepositoryKeysDAO {
@@ -80,20 +79,20 @@ public class CassandraMailRepositoryKeysDAO {
.value(MAIL_KEY, bindMarker(MAIL_KEY)));
}
- public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) {
+ public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) {
return executor.executeReturnApplied(insertKey.bind()
.setString(REPOSITORY_NAME, url.asString())
.setString(MAIL_KEY, key.asString()));
}
- public CompletableFuture<Stream<MailKey>> list(MailRepositoryUrl url) {
- return executor.execute(listKeys.bind()
+ public Flux<MailKey> list(MailRepositoryUrl url) {
+ return executor.executeReactor(listKeys.bind()
.setString(REPOSITORY_NAME, url.asString()))
- .thenApply(cassandraUtils::convertToStream)
- .thenApply(stream -> stream.map(row -> new MailKey(row.getString(MAIL_KEY))));
+ .flatMapMany(cassandraUtils::convertToFlux)
+ .map(row -> new MailKey(row.getString(MAIL_KEY)));
}
- public CompletableFuture<Boolean> remove(MailRepositoryUrl url, MailKey key) {
+ public Mono<Boolean> remove(MailRepositoryUrl url, MailKey key) {
return executor.executeReturnApplied(deleteKey.bind()
.setString(REPOSITORY_NAME, url.asString())
.setString(MAIL_KEY, key.asString()));
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
index 7c3596b..6e884b4 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
@@ -84,6 +84,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMailDaoAPI {
@@ -161,8 +162,8 @@ public class CassandraMailRepositoryMailDAO implements CassandraMailRepositoryMa
}
@Override
- public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
- return executor.executeVoid(deleteMail.bind()
+ public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+ return executor.executeVoidReactor(deleteMail.bind()
.setString(REPOSITORY_NAME, url.asString())
.setString(MAIL_KEY, key.asString()));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
index 435bcf1..bf49097 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoAPI.java
@@ -31,10 +31,12 @@ import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.server.core.MailImpl;
import org.apache.mailet.Mail;
+import reactor.core.publisher.Mono;
+
public interface CassandraMailRepositoryMailDaoAPI {
CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException;
- CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key);
+ Mono<Void> remove(MailRepositoryUrl url, MailKey key);
CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
index 36e8e28..5b4edfe 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java
@@ -79,6 +79,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepositoryMailDaoAPI {
@@ -151,8 +152,9 @@ public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepository
);
}
- public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
- return executor.executeVoid(deleteMail.bind()
+ @Override
+ public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+ return executor.executeVoidReactor(deleteMail.bind()
.setString(REPOSITORY_NAME, url.asString())
.setString(MAIL_KEY, key.asString()));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
index 8b01a38..f83766a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MergingCassandraMailRepositoryMailDao.java
@@ -32,6 +32,8 @@ import org.apache.james.util.OptionalUtils;
import org.apache.mailet.Mail;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepositoryMailDaoAPI {
@@ -51,8 +53,8 @@ public class MergingCassandraMailRepositoryMailDao implements CassandraMailRepos
}
@Override
- public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
- return CompletableFuture.allOf(v1.remove(url, key), v2.remove(url, key));
+ public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+ return Flux.merge(v1.remove(url, key), v2.remove(url, key)).then();
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
index 98b6fa2..32eb13d 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryCountDAOTest.java
@@ -50,7 +50,7 @@ class CassandraMailRepositoryCountDAOTest {
@Test
void getCountShouldReturnOneWhenIncrementedOneTime() {
- testee.increment(URL).join();
+ testee.increment(URL).block();
assertThat(testee.getCount(URL).join())
.isEqualTo(1L);
@@ -58,7 +58,7 @@ class CassandraMailRepositoryCountDAOTest {
@Test
void incrementShouldNotAffectOtherUrls() {
- testee.increment(URL).join();
+ testee.increment(URL).block();
assertThat(testee.getCount(URL2).join())
.isEqualTo(0L);
@@ -66,8 +66,8 @@ class CassandraMailRepositoryCountDAOTest {
@Test
void incrementCanBeAppliedSeveralTime() {
- testee.increment(URL).join();
- testee.increment(URL).join();
+ testee.increment(URL).block();
+ testee.increment(URL).block();
assertThat(testee.getCount(URL).join())
.isEqualTo(2L);
@@ -75,11 +75,11 @@ class CassandraMailRepositoryCountDAOTest {
@Test
void decrementShouldDecreaseCount() {
- testee.increment(URL).join();
- testee.increment(URL).join();
- testee.increment(URL).join();
+ testee.increment(URL).block();
+ testee.increment(URL).block();
+ testee.increment(URL).block();
- testee.decrement(URL).join();
+ testee.decrement(URL).block();
assertThat(testee.getCount(URL).join())
.isEqualTo(2L);
@@ -87,7 +87,7 @@ class CassandraMailRepositoryCountDAOTest {
@Test
void decrementCanLeadToNegativeCount() {
- testee.decrement(URL).join();
+ testee.decrement(URL).block();
assertThat(testee.getCount(URL).join())
.isEqualTo(-1L);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
index 3c457af..db7f98a 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryKeysDAOTest.java
@@ -23,7 +23,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.junit.jupiter.api.BeforeEach;
@@ -36,9 +38,11 @@ class CassandraMailRepositoryKeysDAOTest {
static final MailKey KEY_1 = new MailKey("key1");
static final MailKey KEY_2 = new MailKey("key2");
static final MailKey KEY_3 = new MailKey("key3");
+ static final CassandraModule MODULE = CassandraModule.aggregateModules(CassandraMailRepositoryModule.MODULE,
+ CassandraSchemaVersionModule.MODULE);
@RegisterExtension
- static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailRepositoryModule.MODULE);
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
CassandraMailRepositoryKeysDAO testee;
@@ -49,75 +53,75 @@ class CassandraMailRepositoryKeysDAOTest {
@Test
void listShouldBeEmptyByDefault() {
- assertThat(testee.list(URL).join())
+ assertThat(testee.list(URL).collectList().block())
.isEmpty();
}
@Test
void listShouldReturnEmptyByDefault() {
- testee.store(URL, KEY_1).join();
+ testee.store(URL, KEY_1).block();
- assertThat(testee.list(URL).join())
+ assertThat(testee.list(URL).collectList().block())
.containsOnly(KEY_1);
}
@Test
void listShouldNotReturnElementsOfOtherRepositories() {
- testee.store(URL, KEY_1).join();
+ testee.store(URL, KEY_1).block();
- assertThat(testee.list(URL2).join())
+ assertThat(testee.list(URL2).collectList().block())
.isEmpty();
}
@Test
void listShouldReturnSeveralElements() {
- testee.store(URL, KEY_1).join();
- testee.store(URL, KEY_2).join();
- testee.store(URL, KEY_3).join();
+ testee.store(URL, KEY_1).block();
+ testee.store(URL, KEY_2).block();
+ testee.store(URL, KEY_3).block();
- assertThat(testee.list(URL).join())
+ assertThat(testee.list(URL).collectList().block())
.containsOnly(KEY_1, KEY_2, KEY_3);
}
@Test
void listShouldNotReturnRemovedElements() {
- testee.store(URL, KEY_1).join();
- testee.store(URL, KEY_2).join();
- testee.store(URL, KEY_3).join();
+ testee.store(URL, KEY_1).block();
+ testee.store(URL, KEY_2).block();
+ testee.store(URL, KEY_3).block();
- testee.remove(URL, KEY_2).join();
+ testee.remove(URL, KEY_2).block();
- assertThat(testee.list(URL).join())
+ assertThat(testee.list(URL).collectList().block())
.containsOnly(KEY_1, KEY_3);
}
@Test
void removeShouldBeIdempotent() {
- testee.remove(URL, KEY_2).join();
+ testee.remove(URL, KEY_2).block();
}
@Test
void removeShouldNotAffectOtherRepositories() {
- testee.store(URL, KEY_1).join();
+ testee.store(URL, KEY_1).block();
- testee.remove(URL2, KEY_2).join();
+ testee.remove(URL2, KEY_2).block();
- assertThat(testee.list(URL).join())
+ assertThat(testee.list(URL).collectList().block())
.containsOnly(KEY_1);
}
@Test
void removeShouldReturnTrueWhenKeyDeleted() {
- testee.store(URL, KEY_1).join();
+ testee.store(URL, KEY_1).block();
- boolean isDeleted = testee.remove(URL, KEY_1).join();
+ boolean isDeleted = testee.remove(URL, KEY_1).block();
assertThat(isDeleted).isTrue();
}
@Test
void removeShouldReturnFalseWhenKeyNotDeleted() {
- boolean isDeleted = testee.remove(URL2, KEY_2).join();
+ boolean isDeleted = testee.remove(URL2, KEY_2).block();
assertThat(isDeleted).isFalse();
}
@@ -125,16 +129,16 @@ class CassandraMailRepositoryKeysDAOTest {
@Test
void storeShouldReturnTrueWhenNotPreviouslyStored() {
- boolean isStored = testee.store(URL, KEY_1).join();
+ boolean isStored = testee.store(URL, KEY_1).block();
assertThat(isStored).isTrue();
}
@Test
void storeShouldReturnFalseWhenPreviouslyStored() {
- testee.store(URL, KEY_1).join();
+ testee.store(URL, KEY_1).block();
- boolean isStored = testee.store(URL, KEY_1).join();
+ boolean isStored = testee.store(URL, KEY_1).block();
assertThat(isStored).isFalse();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
index 2377b6b..f597eff 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -104,7 +104,7 @@ class CassandraMailRepositoryMailDAOTest {
blobIdBody)
.join();
- testee.remove(URL, KEY_1).join();
+ testee.remove(URL, KEY_1).block();
assertThat(testee.read(URL, KEY_1).join())
.isEmpty();
@@ -374,7 +374,7 @@ class CassandraMailRepositoryMailDAOTest {
blobIdBody2)
.join();
- testee.remove(URL, KEY_1).join();
+ testee.remove(URL, KEY_1).block();
Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v1Entry = v1.read(URL, KEY_1).join();
Optional<CassandraMailRepositoryMailDaoAPI.MailDTO> v2Entry = v2.read(URL, KEY_1).join();
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
index ca2eb2c..b39ed2d 100644
--- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryWithFakeImplementationsTest.java
@@ -33,6 +33,7 @@ import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.Store;
@@ -56,17 +57,19 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
@ExtendWith(CassandraMailRepositoryWithFakeImplementationsTest.MailRepositoryCassandraClusterExtension.class)
class CassandraMailRepositoryWithFakeImplementationsTest {
- static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url");
- static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+ private static final MailRepositoryUrl URL = MailRepositoryUrl.from("proto://url");
+ private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
static class MailRepositoryCassandraClusterExtension extends CassandraClusterExtension {
public MailRepositoryCassandraClusterExtension() {
super(CassandraModule.aggregateModules(
CassandraMailRepositoryModule.MODULE,
- CassandraBlobModule.MODULE));
+ CassandraBlobModule.MODULE,
+ CassandraSchemaVersionModule.MODULE));
}
@Override
@@ -122,7 +125,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
.isInstanceOf(RuntimeException.class)
.hasMessage("java.lang.RuntimeException: Expected failure while saving");
- assertThat(keysDAO.list(URL).join()).isEmpty();
+ assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
}
}
@@ -155,9 +158,9 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
}
@Override
- public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) {
- return CompletableFuture.supplyAsync(() -> {
- throw new RuntimeException("Expected failure while remeving mail parts");
+ public Mono<Void> remove(MailRepositoryUrl url, MailKey key) {
+ return Mono.fromCallable(() -> {
+ throw new RuntimeException("Expected failure while removing mail parts");
});
}
@@ -186,7 +189,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
.isInstanceOf(RuntimeException.class)
.hasMessage("java.lang.RuntimeException: Expected failure while storing mail parts");
- assertThat(keysDAO.list(URL).join()).isEmpty();
+ assertThat(keysDAO.list(URL).collectList().block()).isEmpty();
}
@Test
@@ -234,8 +237,8 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
}
@Override
- public CompletableFuture<Boolean> store(MailRepositoryUrl url, MailKey key) {
- return CompletableFuture.supplyAsync(() -> {
+ public Mono<Boolean> store(MailRepositoryUrl url, MailKey key) {
+ return Mono.fromCallable(() -> {
throw new RuntimeException("Expected failure while storing keys");
});
}
@@ -255,7 +258,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
assertThatThrownBy(() -> cassandraMailRepository.store(mail))
.isInstanceOf(RuntimeException.class)
- .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+ .hasMessage("Expected failure while storing keys");
assertThat(countDAO.getCount(URL).join()).isEqualTo(0);
}
@@ -274,7 +277,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
assertThatThrownBy(() -> cassandraMailRepository.store(mail))
.isInstanceOf(RuntimeException.class)
- .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+ .hasMessage("Expected failure while storing keys");
ResultSet resultSet = cassandra.getConf().execute(select()
.from(BlobTable.TABLE_NAME));
@@ -295,7 +298,7 @@ class CassandraMailRepositoryWithFakeImplementationsTest {
assertThatThrownBy(() -> cassandraMailRepository.store(mail))
.isInstanceOf(RuntimeException.class)
- .hasMessage("java.lang.RuntimeException: Expected failure while storing keys");
+ .hasMessage("Expected failure while storing keys");
ResultSet resultSet = cassandra.getConf().execute(select()
.from(MailRepositoryTable.CONTENT_TABLE_NAME));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org