You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:23 UTC
[08/12] james-project git commit: JAMES-2630 Migrate
CassandraAsyncExecutor.executeReturnApplied consumers to Reactor
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
index f7d7b5d..0b87025 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
@@ -29,12 +29,13 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.store.mail.model.Username;
-import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.streams.JamesCollectors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Flux;
+
class CassandraAttachmentOwnerDAOTest {
private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
private static final Username OWNER_1 = Username.fromRawValue("owner1");
@@ -59,7 +60,7 @@ class CassandraAttachmentOwnerDAOTest {
@Test
void retrieveOwnersShouldReturnAddedOwner() {
- testee.addOwner(ATTACHMENT_ID, OWNER_1).join();
+ testee.addOwner(ATTACHMENT_ID, OWNER_1).block();
assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
.containsOnly(OWNER_1);
@@ -67,8 +68,8 @@ class CassandraAttachmentOwnerDAOTest {
@Test
void retrieveOwnersShouldReturnAddedOwners() {
- testee.addOwner(ATTACHMENT_ID, OWNER_1).join();
- testee.addOwner(ATTACHMENT_ID, OWNER_2).join();
+ testee.addOwner(ATTACHMENT_ID, OWNER_1).block();
+ testee.addOwner(ATTACHMENT_ID, OWNER_2).block();
assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
.containsOnly(OWNER_1, OWNER_2);
@@ -81,10 +82,10 @@ class CassandraAttachmentOwnerDAOTest {
IntStream.range(0, referenceCountExceedingPaging)
.boxed()
.collect(JamesCollectors.chunker(128))
- .forEach(chunk -> FluentFutureStream.of(
- chunk.stream()
- .map(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i))))
- .join());
+ .forEach(chunk -> Flux.fromIterable(chunk)
+ .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)))
+ .then()
+ .block());
assertThat(testee.retrieveOwners(ATTACHMENT_ID).join())
.hasSize(referenceCountExceedingPaging);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index 09dc1ee..4af1bdc 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -36,8 +36,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import com.github.steveash.guavate.Guavate;
-
class CassandraMailboxDAOTest {
private static final int UID_VALIDITY_1 = 145;
private static final int UID_VALIDITY_2 = 147;
@@ -70,30 +68,30 @@ class CassandraMailboxDAOTest {
@Test
void retrieveMailboxShouldReturnEmptyWhenNone() {
- assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join())
+ assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional())
.isEmpty();
}
@Test
void saveShouldAddAMailbox() {
- testee.save(mailbox1).join();
+ testee.save(mailbox1).block();
Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
- .join();
+ .blockOptional();
assertThat(readMailbox.isPresent()).isTrue();
assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1);
}
@Test
void saveShouldOverride() {
- testee.save(mailbox1).join();
+ testee.save(mailbox1).block();
mailbox2.setMailboxId(CASSANDRA_ID_1);
- testee.save(mailbox2).join();
+ testee.save(mailbox2).block();
Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
- .join();
+ .blockOptional();
assertThat(readMailbox.isPresent()).isTrue();
assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox2);
}
@@ -101,47 +99,47 @@ class CassandraMailboxDAOTest {
@Test
void retrieveAllMailboxesShouldBeEmptyByDefault() {
List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
- .join()
- .collect(Guavate.toImmutableList());
+ .collectList()
+ .block();
assertThat(mailboxes).isEmpty();
}
@Test
void retrieveAllMailboxesShouldReturnSingleMailbox() {
- testee.save(mailbox1).join();
+ testee.save(mailbox1).block();
List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
- .join()
- .collect(Guavate.toImmutableList());
+ .collectList()
+ .block();
assertThat(mailboxes).containsOnly(mailbox1);
}
@Test
void retrieveAllMailboxesShouldReturnMultiMailboxes() {
- testee.save(mailbox1).join();
- testee.save(mailbox2).join();
+ testee.save(mailbox1).block();
+ testee.save(mailbox2).block();
List<SimpleMailbox> mailboxes = testee.retrieveAllMailboxes()
- .join()
- .collect(Guavate.toImmutableList());
+ .collectList()
+ .block();
assertThat(mailboxes).containsOnly(mailbox1, mailbox2);
}
@Test
void deleteShouldNotFailWhenMailboxIsAbsent() {
- testee.delete(CASSANDRA_ID_1).join();
+ testee.delete(CASSANDRA_ID_1).block();
}
@Test
void deleteShouldRemoveExistingMailbox() {
- testee.save(mailbox1).join();
+ testee.save(mailbox1).block();
- testee.delete(CASSANDRA_ID_1).join();
+ testee.delete(CASSANDRA_ID_1).block();
- assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).join())
+ assertThat(testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional())
.isEmpty();
}
@@ -152,14 +150,14 @@ class CassandraMailboxDAOTest {
@Test
void updateShouldChangeMailboxPath() {
- testee.save(mailbox1).join();
+ testee.save(mailbox1).block();
testee.updatePath(CASSANDRA_ID_1, NEW_MAILBOX_PATH).join();
mailbox1.setNamespace(NEW_MAILBOX_PATH.getNamespace());
mailbox1.setUser(NEW_MAILBOX_PATH.getUser());
mailbox1.setName(NEW_MAILBOX_PATH.getName());
- Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).join();
+ Optional<SimpleMailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1).blockOptional();
assertThat(readMailbox.isPresent()).isTrue();
assertThat(readMailbox.get()).isEqualToComparingFieldByField(mailbox1);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 1c3fb37..4ffa3dd 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -113,7 +113,7 @@ public class CassandraMailboxMapperTest {
testee.save(newMailbox))
.isInstanceOf(TooLongMailboxNameException.class);
- assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).join())
+ assertThat(mailboxPathV2DAO.retrieveId(MAILBOX_PATH).blockOptional())
.isPresent();
}
@@ -124,9 +124,9 @@ public class CassandraMailboxMapperTest {
@Test
public void deleteShouldDeleteMailboxAndMailboxPathFromV1Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
testee.delete(MAILBOX);
@@ -137,9 +137,9 @@ public class CassandraMailboxMapperTest {
@Test
public void deleteShouldDeleteMailboxAndMailboxPathFromV2Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
testee.delete(MAILBOX);
@@ -150,9 +150,9 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxByPathShouldReturnMailboxWhenExistsInV1Table() throws Exception {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
@@ -162,9 +162,9 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxByPathShouldReturnMailboxWhenExistsInV2Table() throws Exception {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
@@ -174,11 +174,11 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxByPathShouldReturnMailboxWhenExistsInBothTables() throws Exception {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
Mailbox mailbox = testee.findMailboxByPath(MAILBOX_PATH);
@@ -188,11 +188,11 @@ public class CassandraMailboxMapperTest {
@Test
public void deleteShouldRemoveMailboxWhenInBothTables() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
testee.delete(MAILBOX);
@@ -203,9 +203,9 @@ public class CassandraMailboxMapperTest {
@Test
public void deleteShouldRemoveMailboxWhenInV1Tables() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
testee.delete(MAILBOX);
@@ -216,9 +216,9 @@ public class CassandraMailboxMapperTest {
@Test
public void deleteShouldRemoveMailboxWhenInV2Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
testee.delete(MAILBOX);
@@ -229,7 +229,7 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxByPathShouldThrowWhenDoesntExistInBothTables() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
assertThatThrownBy(() -> testee.findMailboxByPath(MAILBOX_PATH))
.isInstanceOf(MailboxNotFoundException.class);
@@ -238,9 +238,9 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV1Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
@@ -250,11 +250,11 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInBothTables() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
@@ -264,9 +264,9 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxWithPathLikeShouldReturnMailboxesWhenExistsInV2Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
List<Mailbox> mailboxes = testee.findMailboxWithPathLike(new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD));
@@ -276,16 +276,16 @@ public class CassandraMailboxMapperTest {
@Test
public void hasChildrenShouldReturnChildWhenExistsInV1Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
CassandraId childMailboxId = CassandraId.timeBased();
MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
mailboxDAO.save(childMailbox)
- .join();
+ .block();
mailboxPathDAO.save(childMailboxPath, childMailboxId)
- .join();
+ .block();
boolean hasChildren = testee.hasChildren(MAILBOX, '.');
@@ -295,18 +295,18 @@ public class CassandraMailboxMapperTest {
@Test
public void hasChildrenShouldReturnChildWhenExistsInBothTables() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
CassandraId childMailboxId = CassandraId.timeBased();
MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
mailboxDAO.save(childMailbox)
- .join();
+ .block();
mailboxPathDAO.save(childMailboxPath, childMailboxId)
- .join();
+ .block();
boolean hasChildren = testee.hasChildren(MAILBOX, '.');
@@ -316,16 +316,16 @@ public class CassandraMailboxMapperTest {
@Test
public void hasChildrenShouldReturnChildWhenExistsInV2Table() {
mailboxDAO.save(MAILBOX)
- .join();
+ .block();
mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID)
- .join();
+ .block();
CassandraId childMailboxId = CassandraId.timeBased();
MailboxPath childMailboxPath = MailboxPath.forUser(USER, "name.child");
Mailbox childMailbox = new SimpleMailbox(childMailboxPath, UID_VALIDITY, childMailboxId);
mailboxDAO.save(childMailbox)
- .join();
+ .block();
mailboxPathV2DAO.save(childMailboxPath, childMailboxId)
- .join();
+ .block();
boolean hasChildren = testee.hasChildren(MAILBOX, '.');
@@ -334,11 +334,11 @@ public class CassandraMailboxMapperTest {
@Test
public void findMailboxWithPathLikeShouldRemoveDuplicatesAndKeepV2() {
- mailboxDAO.save(MAILBOX).join();
- mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).join();
+ mailboxDAO.save(MAILBOX).block();
+ mailboxPathV2DAO.save(MAILBOX_PATH, MAILBOX_ID).block();
- mailboxDAO.save(MAILBOX_BIS).join();
- mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).join();
+ mailboxDAO.save(MAILBOX_BIS).block();
+ mailboxPathDAO.save(MAILBOX_PATH, MAILBOX_ID_2).block();
assertThat(testee.findMailboxWithPathLike(
new MailboxPath(MailboxConstants.USER_NAMESPACE, USER, WILDCARD)))
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
index 429b8af..dbf64f2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImplTest.java
@@ -35,9 +35,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest {
@Test
void countAllShouldReturnEntryCount() {
- testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
- testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
- testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+ testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+ testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+ testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;
@@ -55,9 +55,9 @@ class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest {
@Test
void readAllShouldReturnAllStoredData() {
- testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
- testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
- testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+ testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+ testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+ testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
index 43592ae..acd6245 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOTest.java
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import com.github.steveash.guavate.Guavate;
-
import nl.jqno.equalsverifier.EqualsVerifier;
public abstract class CassandraMailboxPathDAOTest {
@@ -70,43 +68,42 @@ public abstract class CassandraMailboxPathDAOTest {
@Test
void saveShouldInsertNewEntry() {
- assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue();
+ assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue();
- assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+ assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
.contains(INBOX_ID_AND_PATH);
}
@Test
void saveOnSecondShouldBeFalse() {
- assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isTrue();
- assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join()).isFalse();
+ assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isTrue();
+ assertThat(testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block()).isFalse();
}
@Test
void retrieveIdShouldReturnEmptyWhenEmptyData() {
- assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join()
- .isPresent())
- .isFalse();
+ assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
+ .isEmpty();
}
@Test
void retrieveIdShouldReturnStoredData() {
- testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
+ testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
- assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+ assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
.contains(INBOX_ID_AND_PATH);
}
@Test
void getUserMailboxesShouldReturnAllMailboxesOfUser() {
- testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
- testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
- testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();
+ testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
+ testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).block();
+ testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).block();
List<CassandraIdAndPath> cassandraIds = testee
.listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser())
- .join()
- .collect(Guavate.toImmutableList());
+ .collectList()
+ .block();
assertThat(cassandraIds)
.hasSize(2)
@@ -115,16 +112,16 @@ public abstract class CassandraMailboxPathDAOTest {
@Test
void deleteShouldNotThrowWhenEmpty() {
- testee.delete(USER_INBOX_MAILBOXPATH).join();
+ testee.delete(USER_INBOX_MAILBOXPATH).block();
}
@Test
void deleteShouldDeleteTheExistingMailboxId() {
- testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
+ testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).block();
- testee.delete(USER_INBOX_MAILBOXPATH).join();
+ testee.delete(USER_INBOX_MAILBOXPATH).block();
- assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).join())
+ assertThat(testee.retrieveId(USER_INBOX_MAILBOXPATH).blockOptional())
.isEmpty();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
index 95df7fb..914d1e9 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.List;
import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.MessageUid;
@@ -55,7 +56,7 @@ public class CassandraMapperProvider implements MapperProvider {
public CassandraMapperProvider(CassandraCluster cassandra) {
this.cassandra = cassandra;
messageUidProvider = new MessageUidProvider();
- cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf());
+ cassandraModSeqProvider = new CassandraModSeqProvider(this.cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
mapperFactory = createMapperFactory();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index da90b78..4eabd41 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.mail.Flags;
@@ -64,6 +63,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Bytes;
import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Flux;
class CassandraMessageDAOTest {
private static final int BODY_START = 16;
@@ -186,8 +186,8 @@ class CassandraMessageDAOTest {
.build();
}
- private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAO.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
- return readOptional.join()
+ private MessageWithoutAttachment toMessage(Flux<CassandraMessageDAO.MessageResult> read) throws InterruptedException, java.util.concurrent.ExecutionException {
+ return read.toStream()
.map(CassandraMessageDAO.MessageResult::message)
.map(Pair::getLeft)
.findAny()
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index 26947ab..67b9c90 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -57,7 +57,7 @@ class CassandraMessageIdDAOTest {
@Test
void deleteShouldNotThrowWhenRowDoesntExist() {
testee.delete(CassandraId.timeBased(), MessageUid.of(1))
- .join();
+ .block();
}
@Test
@@ -72,7 +72,7 @@ class CassandraMessageIdDAOTest {
.build())
.join();
- testee.delete(mailboxId, messageUid).join();
+ testee.delete(mailboxId, messageUid).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.isPresent()).isFalse();
@@ -98,7 +98,7 @@ class CassandraMessageIdDAOTest {
.build()))
.join();
- testee.delete(mailboxId, messageUid).join();
+ testee.delete(mailboxId, messageUid).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.isPresent()).isFalse();
@@ -142,7 +142,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags())
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -167,7 +167,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.ANSWERED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -192,7 +192,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.DELETED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -217,7 +217,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.DRAFT))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -242,7 +242,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.FLAGGED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -267,7 +267,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.RECENT))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -292,7 +292,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.SEEN))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -317,7 +317,7 @@ class CassandraMessageIdDAOTest {
.flags(new Flags(Flag.USER))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
@@ -344,7 +344,7 @@ class CassandraMessageIdDAOTest {
.flags(flags)
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId).join();
+ testee.updateMetadata(expectedComposedMessageId).block();
Optional<ComposedMessageIdWithMetaData> message = testee.retrieve(mailboxId, messageUid).join();
assertThat(message.get()).isEqualTo(expectedComposedMessageId);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
index a200680..f563e5f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAOTest.java
@@ -20,9 +20,9 @@ package org.apache.james.mailbox.cassandra.mail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -42,12 +42,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.datastax.driver.core.utils.UUIDs;
-import com.github.steveash.guavate.Guavate;
class CassandraMessageIdToImapUidDAOTest {
+ public static final CassandraModule MODULE = CassandraModule.aggregateModules(
+ CassandraSchemaVersionModule.MODULE,
+ CassandraMessageModule.MODULE);
+
@RegisterExtension
- static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules(
- CassandraMessageModule.MODULE, CassandraSchemaVersionModule.MODULE));
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE);
private CassandraMessageId.Factory messageIdFactory;
@@ -62,7 +64,7 @@ class CassandraMessageIdToImapUidDAOTest {
@Test
void deleteShouldNotThrowWhenRowDoesntExist() {
testee.delete(messageIdFactory.of(UUIDs.timeBased()), CassandraId.timeBased())
- .join();
+ .block();
}
@Test
@@ -77,10 +79,10 @@ class CassandraMessageIdToImapUidDAOTest {
.build())
.join();
- testee.delete(messageId, mailboxId).join();
+ testee.delete(messageId, mailboxId).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).isEmpty();
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).isEmpty();
}
@Test
@@ -103,15 +105,15 @@ class CassandraMessageIdToImapUidDAOTest {
.build()))
.join();
- testee.delete(messageId, mailboxId).join();
+ testee.delete(messageId, mailboxId).block();
ComposedMessageIdWithMetaData expectedComposedMessageId = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(mailboxId2, messageId, messageUid2))
.flags(new Flags())
.modSeq(1)
.build();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -132,8 +134,8 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags())
.modSeq(1)
.build();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -149,7 +151,7 @@ class CassandraMessageIdToImapUidDAOTest {
.build();
testee.insert(composedMessageIdWithFlags).join();
- Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).join();
+ Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 1).block();
assertThat(result).isTrue();
}
@@ -167,7 +169,7 @@ class CassandraMessageIdToImapUidDAOTest {
.build();
testee.insert(composedMessageIdWithFlags).join();
- Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).join();
+ Boolean result = testee.updateMetadata(composedMessageIdWithFlags, 3).block();
assertThat(result).isFalse();
}
@@ -191,10 +193,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags())
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -216,10 +218,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.ANSWERED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -241,10 +243,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.DELETED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -266,10 +268,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.DRAFT))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -291,10 +293,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.FLAGGED))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -316,10 +318,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.RECENT))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -341,10 +343,11 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.SEEN))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ assertThat(testee.updateMetadata(expectedComposedMessageId, 1).block())
+ .isTrue();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -366,10 +369,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags(Flag.USER))
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -393,10 +396,10 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(flags)
.modSeq(2)
.build();
- testee.updateMetadata(expectedComposedMessageId, 1).join();
+ testee.updateMetadata(expectedComposedMessageId, 1).block();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -416,9 +419,9 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags())
.modSeq(1)
.build();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).join();
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.of(mailboxId)).collectList().block();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId);
+ assertThat(messages).containsOnly(expectedComposedMessageId);
}
@Test
@@ -451,8 +454,8 @@ class CassandraMessageIdToImapUidDAOTest {
.flags(new Flags())
.modSeq(1)
.build();
- Stream<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).join();
+ List<ComposedMessageIdWithMetaData> messages = testee.retrieve(messageId, Optional.empty()).collectList().block();
- assertThat(messages.collect(Guavate.toImmutableList())).containsOnly(expectedComposedMessageId, expectedComposedMessageId2);
+ assertThat(messages).containsOnly(expectedComposedMessageId, expectedComposedMessageId2);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index a0b691d..a8a2722 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -20,14 +20,19 @@ package org.apache.james.mailbox.cassandra.mail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
import java.util.stream.LongStream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -46,7 +51,7 @@ class CassandraModSeqProviderTest {
@BeforeEach
void setUp(CassandraCluster cassandra) {
- modSeqProvider = new CassandraModSeqProvider(cassandra.getConf());
+ modSeqProvider = new CassandraModSeqProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
mailbox = new SimpleMailbox(path, 1234);
mailbox.setMailboxId(CASSANDRA_ID);
@@ -77,13 +82,17 @@ class CassandraModSeqProviderTest {
}
@Test
- void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() {
- int nbEntries = 100;
- long nbValues = LongStream.range(0, nbEntries)
- .parallel()
- .map(Throwing.longUnaryOperator(x -> modSeqProvider.nextModSeq(null, mailbox)))
- .distinct()
- .count();
- assertThat(nbValues).isEqualTo(nbEntries);
+ void nextModSeqShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
+ int nbEntries = 10;
+
+ ConcurrentSkipListSet<Long> modSeqs = new ConcurrentSkipListSet<>();
+ ConcurrentTestRunner.builder()
+ .operation(
+ (threadNumber, step) -> modSeqs.add(modSeqProvider.nextModSeq(null, mailbox)))
+ .threadCount(10)
+ .operationCount(nbEntries)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(modSeqs).hasSize(100);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
index 619ced7..5a3fe69 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
@@ -20,16 +20,21 @@ package org.apache.james.mailbox.cassandra.mail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
import java.util.stream.LongStream;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.modules.CassandraUidModule;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -47,7 +52,7 @@ class CassandraUidProviderTest {
@BeforeEach
void setUp(CassandraCluster cassandra) {
- uidProvider = new CassandraUidProvider(cassandra.getConf());
+ uidProvider = new CassandraUidProvider(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION);
MailboxPath path = new MailboxPath("gsoc", "ieugen", "Trash");
mailbox = new SimpleMailbox(path, 1234);
mailbox.setMailboxId(CASSANDRA_ID);
@@ -77,14 +82,17 @@ class CassandraUidProviderTest {
}
@Test
- void nextUidShouldGenerateUniqueValuesWhenParallelCalls() {
+ void nextUidShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
+ int threadCount = 10;
int nbEntries = 100;
- long nbValues = LongStream.range(0, nbEntries)
- .parallel()
- .boxed()
- .map(Throwing.function(x -> uidProvider.nextUid(null, mailbox)))
- .distinct()
- .count();
- assertThat(nbValues).isEqualTo(nbEntries);
+
+ ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>();
+ ConcurrentTestRunner.builder()
+ .operation((threadNumber, step) -> messageUids.add(uidProvider.nextUid(null, mailbox)))
+ .threadCount(threadCount)
+ .operationCount(nbEntries / threadCount)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(messageUids).hasSize(nbEntries);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
index 9730bda..1596e1c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAOTest.java
@@ -57,7 +57,7 @@ class CassandraUserMailboxRightsDAOTest {
testee.update(MAILBOX_ID, ACLDiff.computeDiff(
MailboxACL.EMPTY,
new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
- .join();
+ .block();
assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
.contains(RIGHTS);
@@ -68,12 +68,12 @@ class CassandraUserMailboxRightsDAOTest {
testee.update(MAILBOX_ID, ACLDiff.computeDiff(
MailboxACL.EMPTY,
new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
- .join();
+ .block();
testee.update(MAILBOX_ID, ACLDiff.computeDiff(
new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)),
new MailboxACL(new Entry(ENTRY_KEY, OTHER_RIGHTS))))
- .join();
+ .block();
assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
.contains(OTHER_RIGHTS);
@@ -81,7 +81,7 @@ class CassandraUserMailboxRightsDAOTest {
@Test
void listRightsForUserShouldReturnEmptyWhenEmptyData() {
- assertThat(testee.listRightsForUser(USER_NAME).join())
+ assertThat(testee.listRightsForUser(USER_NAME).collectList().block())
.isEmpty();
}
@@ -90,13 +90,13 @@ class CassandraUserMailboxRightsDAOTest {
testee.update(MAILBOX_ID, ACLDiff.computeDiff(
MailboxACL.EMPTY,
new MailboxACL(new Entry(ENTRY_KEY, RIGHTS))))
- .join();
+ .block();
testee.update(MAILBOX_ID, ACLDiff.computeDiff(
new MailboxACL(new Entry(ENTRY_KEY, RIGHTS)),
MailboxACL.EMPTY))
- .join();
+ .block();
assertThat(testee.retrieve(USER_NAME, MAILBOX_ID).join())
.isEmpty();
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
index a298106..85616b1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java
@@ -46,6 +46,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+
class AttachmentV2MigrationTest {
private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
private static final AttachmentId ATTACHMENT_ID_2 = AttachmentId.from("id2");
@@ -107,9 +109,9 @@ class AttachmentV2MigrationTest {
migration.run();
- assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).join())
+ assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID).blockOptional())
.contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
- assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join())
+ assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).blockOptional())
.contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join())
.isEqualTo(attachment1.getBytes());
@@ -124,9 +126,9 @@ class AttachmentV2MigrationTest {
migration.run();
- assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).join())
+ assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID).blockOptional())
.isEmpty();
- assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).join())
+ assertThat(attachmentDAO.getAttachment(ATTACHMENT_ID_2).blockOptional())
.isEmpty();
}
@@ -190,7 +192,7 @@ class AttachmentV2MigrationTest {
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
when(blobsDAO.save(attachment2.getBytes()))
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())));
- when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
when(attachmentDAO.deleteAttachment(any())).thenThrow(new RuntimeException());
assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
@@ -210,7 +212,7 @@ class AttachmentV2MigrationTest {
.thenReturn(CompletableFuture.completedFuture(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())));
when(blobsDAO.save(attachment2.getBytes()))
.thenThrow(new RuntimeException());
- when(attachmentDAOV2.storeAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(attachmentDAOV2.storeAttachment(any())).thenReturn(Mono.empty());
when(attachmentDAO.deleteAttachment(any())).thenReturn(CompletableFuture.completedFuture(null));
assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index cf19f73..f17ab80 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -91,7 +91,7 @@ class MailboxPathV2MigrationTest {
void newValuesShouldBeSavedInMostRecentDAO() throws Exception {
mailboxMapper.save(MAILBOX_1);
- assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+ assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
.contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
}
@@ -99,33 +99,33 @@ class MailboxPathV2MigrationTest {
void newValuesShouldNotBeSavedInOldDAO() throws Exception {
mailboxMapper.save(MAILBOX_1);
- assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join())
+ assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional())
.isEmpty();
}
@Test
void readingOldValuesShouldMigrateThem() throws Exception {
- daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join();
- mailboxDAO.save(MAILBOX_1).join();
+ daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block();
+ mailboxDAO.save(MAILBOX_1).block();
mailboxMapper.findMailboxByPath(MAILBOX_PATH_1);
SoftAssertions softly = new SoftAssertions();
- softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty();
- softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+ softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty();
+ softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
.contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
softly.assertAll();
}
@Test
void migrationTaskShouldMoveDataToMostRecentDao() {
- daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join();
+ daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).block();
new MailboxPathV2Migration(daoV1, daoV2).run();
SoftAssertions softly = new SoftAssertions();
- softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty();
- softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
+ softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).blockOptional()).isEmpty();
+ softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).blockOptional())
.contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
softly.assertAll();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
index 637cc69..9c43818 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMailboxManagerAttachmentTest.java
@@ -26,11 +26,16 @@ import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Stream;
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageAttachment;
import org.apache.james.mailbox.model.MessageRange;
@@ -149,10 +154,20 @@ public abstract class AbstractMailboxManagerAttachmentTest {
assertThat(messages.hasNext()).isTrue();
List<MessageAttachment> attachments = messages.next().getAttachments();
assertThat(attachments).hasSize(2);
- assertThat(attachmentMapper.getAttachment(attachments.get(0).getAttachmentId()).getStream())
- .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_014.jpg"));
- assertThat(attachmentMapper.getAttachment(attachments.get(1).getAttachmentId()).getStream())
- .hasSameContentAs(ClassLoader.getSystemResourceAsStream("eml/4037_015.jpg"));
+ ImmutableList<byte[]> attachmentContents = attachments
+ .stream()
+ .map(MessageAttachment::getAttachmentId)
+ .map(Throwing.function(attachmentMapper::getAttachment))
+ .map(Attachment::getBytes)
+ .collect(ImmutableList.toImmutableList());
+
+ ImmutableList<byte[]> files = Stream.of("eml/4037_014.jpg", "eml/4037_015.jpg")
+ .map(ClassLoader::getSystemResourceAsStream)
+ .map(Throwing.function(IOUtils::toByteArray))
+ .collect(ImmutableList.toImmutableList());
+
+ assertThat(attachmentContents)
+ .containsExactlyInAnyOrder(files.get(0), files.get(1));
}
@Test
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/container/util/pom.xml
----------------------------------------------------------------------
diff --git a/server/container/util/pom.xml b/server/container/util/pom.xml
index fa76097..85c3a6e 100644
--- a/server/container/util/pom.xml
+++ b/server/container/util/pom.xml
@@ -116,6 +116,12 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>backport-util-concurrent</groupId>
+ <artifactId>backport-util-concurrent</artifactId>
+ <version>3.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
index bd1d177..830b01c 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/domainlist/cassandra/CassandraDomainList.java
@@ -112,7 +112,7 @@ public class CassandraDomainList extends AbstractDomainList {
public void addDomain(Domain domain) throws DomainListException {
boolean executed = executor.executeReturnApplied(insertStatement.bind()
.setString(DOMAIN, domain.asString()))
- .join();
+ .block();
if (!executed) {
throw new DomainListException(domain.name() + " already exists.");
}
@@ -122,7 +122,7 @@ public class CassandraDomainList extends AbstractDomainList {
public void removeDomain(Domain domain) throws DomainListException {
boolean executed = executor.executeReturnApplied(removeStatement.bind()
.setString(DOMAIN, domain.asString()))
- .join();
+ .block();
if (!executed) {
throw new DomainListException(domain.name() + " was not found");
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
index 47c598d..10049e1 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraActiveScriptDAO.java
@@ -32,8 +32,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveActiveTable.
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -44,6 +42,7 @@ import org.apache.james.sieverepository.api.ScriptName;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class CassandraActiveScriptDAO {
private final CassandraAsyncExecutor cassandraAsyncExecutor;
@@ -66,23 +65,23 @@ public class CassandraActiveScriptDAO {
.where(eq(USER_NAME, bindMarker(USER_NAME))));
}
- public CompletableFuture<Optional<ActiveScriptInfo>> getActiveSctiptInfo(User user) {
- return cassandraAsyncExecutor.executeSingleRow(
+ public Mono<ActiveScriptInfo> getActiveSctiptInfo(User user) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
selectActiveName.bind()
.setString(USER_NAME, user.asString()))
- .thenApply(rowOptional -> rowOptional.map(row -> new ActiveScriptInfo(
+ .map(row -> new ActiveScriptInfo(
new ScriptName(row.getString(SCRIPT_NAME)),
- ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC))));
+ ZonedDateTime.ofInstant(row.getTimestamp(DATE).toInstant(), ZoneOffset.UTC)));
}
- public CompletableFuture<Void> unactivate(User user) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> unactivate(User user) {
+ return cassandraAsyncExecutor.executeVoidReactor(
deleteActive.bind()
.setString(USER_NAME, user.asString()));
}
- public CompletableFuture<Void> activate(User user, ScriptName scriptName) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> activate(User user, ScriptName scriptName) {
+ return cassandraAsyncExecutor.executeVoidReactor(
insertActive.bind()
.setString(USER_NAME, user.asString())
.setString(SCRIPT_NAME, scriptName.getValue())
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
index 429ca4b..d3cf741 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveDAO.java
@@ -34,7 +34,6 @@ import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.TABLE_
import static org.apache.james.sieve.cassandra.tables.CassandraSieveTable.USER_NAME;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -50,6 +49,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Select;
import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Mono;
public class CassandraSieveDAO {
@@ -98,8 +98,8 @@ public class CassandraSieveDAO {
.where(eq(USER_NAME, bindMarker(USER_NAME)));
}
- public CompletableFuture<Void> insertScript(User user, Script script) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> insertScript(User user, Script script) {
+ return cassandraAsyncExecutor.executeVoidReactor(
insertScriptStatement.bind()
.setString(USER_NAME, user.asString())
.setString(SCRIPT_NAME, script.getName().getValue())
@@ -120,7 +120,7 @@ public class CassandraSieveDAO {
.collect(Guavate.toImmutableList()));
}
- public CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
+ public Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
return cassandraAsyncExecutor.executeReturnApplied(
updateScriptActivationStatement.bind()
.setString(USER_NAME, user.asString())
@@ -128,24 +128,24 @@ public class CassandraSieveDAO {
.setBool(IS_ACTIVE, active));
}
- public CompletableFuture<Optional<Script>> getScript(User user, ScriptName name) {
- return getScriptRow(user, name).thenApply(opt -> opt.map(row -> Script.builder()
+ public Mono<Script> getScript(User user, ScriptName name) {
+ return getScriptRow(user, name).map(row -> Script.builder()
.content(row.getString(SCRIPT_CONTENT))
.isActive(row.getBool(IS_ACTIVE))
.name(name)
.size(row.getLong(SIZE))
- .build()));
+ .build());
}
- public CompletableFuture<Boolean> deleteScriptInCassandra(User user, ScriptName name) {
+ public Mono<Boolean> deleteScriptInCassandra(User user, ScriptName name) {
return cassandraAsyncExecutor.executeReturnApplied(
deleteScriptStatement.bind()
.setString(USER_NAME, user.asString())
.setString(SCRIPT_NAME, name.getValue()));
}
- private CompletableFuture<Optional<Row>> getScriptRow(User user, ScriptName name) {
- return cassandraAsyncExecutor.executeSingleRow(
+ private Mono<Row> getScriptRow(User user, ScriptName name) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
selectScriptStatement.bind()
.setString(USER_NAME, user.asString())
.setString(SCRIPT_NAME, name.getValue()));
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
index b20ea30..1e99f1a 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveQuotaDAO.java
@@ -41,6 +41,7 @@ import org.apache.james.sieve.cassandra.tables.CassandraSieveSpaceTable;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class CassandraSieveQuotaDAO {
@@ -107,8 +108,8 @@ public class CassandraSieveQuotaDAO {
.orElse(0L));
}
- public CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> updateSpaceUsed(User user, long spaceUsed) {
+ return cassandraAsyncExecutor.executeVoidReactor(
updateSpaceUsedStatement.bind()
.setLong(CassandraSieveSpaceTable.SPACE_USED, spaceUsed)
.setString(CassandraSieveSpaceTable.USER_NAME, user.asString()));
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
index fa52b4a..7f4b559 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/sieve/cassandra/CassandraSieveRepository.java
@@ -25,6 +25,7 @@ import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import javax.inject.Inject;
@@ -43,7 +44,11 @@ import org.apache.james.sieverepository.api.exception.IsActiveException;
import org.apache.james.sieverepository.api.exception.QuotaExceededException;
import org.apache.james.sieverepository.api.exception.QuotaNotFoundException;
import org.apache.james.sieverepository.api.exception.ScriptNotFoundException;
-import org.apache.james.util.CompletableFutureUtil;
+
+import org.apache.james.util.FunctionalUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraSieveRepository implements SieveRepository {
@@ -60,7 +65,8 @@ public class CassandraSieveRepository implements SieveRepository {
@Override
public ZonedDateTime getActivationDateForActiveScript(User user) throws ScriptNotFoundException {
- return cassandraActiveScriptDAO.getActiveSctiptInfo(user).join()
+ return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
+ .blockOptional()
.orElseThrow(ScriptNotFoundException::new)
.getActivationDate();
}
@@ -70,19 +76,20 @@ public class CassandraSieveRepository implements SieveRepository {
throwOnOverQuota(user, spaceThatWillBeUsedByNewScript(user, name, newSize));
}
- private void throwOnOverQuota(User user, CompletableFuture<Long> sizeDifference) throws QuotaExceededException {
+ private void throwOnOverQuota(User user, Mono<Long> sizeDifference) throws QuotaExceededException {
CompletableFuture<Optional<QuotaSize>> userQuotaFuture = cassandraSieveQuotaDAO.getQuota(user);
CompletableFuture<Optional<QuotaSize>> globalQuotaFuture = cassandraSieveQuotaDAO.getQuota();
CompletableFuture<Long> spaceUsedFuture = cassandraSieveQuotaDAO.spaceUsedBy(user);
new SieveQuota(spaceUsedFuture.join(), limitToUse(userQuotaFuture, globalQuotaFuture))
- .checkOverQuotaUponModification(sizeDifference.join());
+ .checkOverQuotaUponModification(sizeDifference.block());
}
- private CompletableFuture<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) {
+ private Mono<Long> spaceThatWillBeUsedByNewScript(User user, ScriptName name, long scriptSize) {
return cassandraSieveDAO.getScript(user, name)
- .thenApply(optional -> optional.map(Script::getSize).orElse(0L))
- .thenApply(sizeOfStoredScript -> scriptSize - sizeOfStoredScript);
+ .map(Script::getSize)
+ .switchIfEmpty(Mono.just(0L))
+ .map(sizeOfStoredScript -> scriptSize - sizeOfStoredScript);
}
private Optional<QuotaSize> limitToUse(CompletableFuture<Optional<QuotaSize>> userQuota, CompletableFuture<Optional<QuotaSize>> globalQuota) {
@@ -94,23 +101,24 @@ public class CassandraSieveRepository implements SieveRepository {
@Override
public void putScript(User user, ScriptName name, ScriptContent content) throws QuotaExceededException {
- CompletableFuture<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length());
+ Mono<Long> spaceUsed = spaceThatWillBeUsedByNewScript(user, name, content.length());
throwOnOverQuota(user, spaceUsed);
- CompletableFuture.allOf(
- updateSpaceUsed(user, spaceUsed.join()),
+ Flux.merge(
+ updateSpaceUsed(user, spaceUsed.block()),
cassandraSieveDAO.insertScript(user,
Script.builder()
.name(name)
.content(content)
.isActive(false)
.build()))
- .join();
+ .then()
+ .block();
}
- private CompletableFuture<Void> updateSpaceUsed(User user, long spaceUsed) {
+ private Mono<Void> updateSpaceUsed(User user, long spaceUsed) {
if (spaceUsed == 0) {
- return CompletableFuture.completedFuture(null);
+ return Mono.empty();
}
return cassandraSieveQuotaDAO.updateSpaceUsed(user, spaceUsed);
}
@@ -124,10 +132,8 @@ public class CassandraSieveRepository implements SieveRepository {
public InputStream getActive(User user) throws ScriptNotFoundException {
return IOUtils.toInputStream(
cassandraActiveScriptDAO.getActiveSctiptInfo(user)
- .thenCompose(optionalActiveName -> optionalActiveName
- .map(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName()))
- .orElse(CompletableFuture.completedFuture(Optional.empty())))
- .join()
+ .flatMap(activeScriptInfo -> cassandraSieveDAO.getScript(user, activeScriptInfo.getName()))
+ .blockOptional()
.orElseThrow(ScriptNotFoundException::new)
.getContent()
.getValue(), StandardCharsets.UTF_8);
@@ -135,36 +141,33 @@ public class CassandraSieveRepository implements SieveRepository {
@Override
public void setActive(User user, ScriptName name) throws ScriptNotFoundException {
- CompletableFuture<Boolean> activateNewScript =
+ Mono<Boolean> activateNewScript =
unactivateOldScript(user)
- .thenCompose(any -> updateScriptActivation(user, name, true)
- .thenCompose(CompletableFutureUtil.composeIfTrue(
- () -> cassandraActiveScriptDAO.activate(user, name))));
+ .then(updateScriptActivation(user, name, true))
+ .filter(FunctionalUtils.toPredicate(Function.identity()))
+ .flatMap(any -> cassandraActiveScriptDAO.activate(user, name).thenReturn(any));
- if (!activateNewScript.join()) {
+ if (!activateNewScript.blockOptional().isPresent()) {
throw new ScriptNotFoundException();
}
}
- private CompletableFuture<Void> unactivateOldScript(User user) {
+ private Mono<Void> unactivateOldScript(User user) {
return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
- .thenCompose(scriptNameOptional -> scriptNameOptional
- .map(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false)
- .<Void>thenApply(any -> null))
- .orElse(CompletableFuture.completedFuture(null)));
+ .flatMap(activeScriptInfo -> updateScriptActivation(user, activeScriptInfo.getName(), false).then());
}
- private CompletableFuture<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
+ private Mono<Boolean> updateScriptActivation(User user, ScriptName scriptName, boolean active) {
if (!scriptName.equals(SieveRepository.NO_SCRIPT_NAME)) {
return cassandraSieveDAO.updateScriptActivation(user, scriptName, active);
}
- return cassandraActiveScriptDAO.unactivate(user).thenApply(any -> true);
+ return cassandraActiveScriptDAO.unactivate(user).thenReturn(true);
}
@Override
public InputStream getScript(User user, ScriptName name) throws ScriptNotFoundException {
return cassandraSieveDAO.getScript(user, name)
- .join()
+ .blockOptional()
.map(script -> IOUtils.toInputStream(script.getContent().getValue(), StandardCharsets.UTF_8))
.orElseThrow(ScriptNotFoundException::new);
}
@@ -172,13 +175,13 @@ public class CassandraSieveRepository implements SieveRepository {
@Override
public void deleteScript(User user, ScriptName name) throws ScriptNotFoundException, IsActiveException {
ensureIsNotActive(user, name);
- if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).join()) {
+ if (!cassandraSieveDAO.deleteScriptInCassandra(user, name).switchIfEmpty(Mono.just(false)).block()) {
throw new ScriptNotFoundException();
}
}
private void ensureIsNotActive(User user, ScriptName name) throws IsActiveException {
- Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).join().map(ActiveScriptInfo::getName);
+ Optional<ScriptName> activeName = cassandraActiveScriptDAO.getActiveSctiptInfo(user).blockOptional().map(ActiveScriptInfo::getName);
if (activeName.isPresent() && name.equals(activeName.get())) {
throw new IsActiveException();
}
@@ -186,22 +189,21 @@ public class CassandraSieveRepository implements SieveRepository {
@Override
public void renameScript(User user, ScriptName oldName, ScriptName newName) throws ScriptNotFoundException, DuplicateException {
- CompletableFuture<Boolean> scriptExistsFuture = cassandraSieveDAO.getScript(user, newName)
- .thenApply(Optional::isPresent);
- CompletableFuture<Optional<Script>> oldScriptFuture = cassandraSieveDAO.getScript(user, oldName);
+ Mono<Script> oldScript = cassandraSieveDAO.getScript(user, oldName).cache();
+ Mono<Boolean> newScriptExists = cassandraSieveDAO.getScript(user, newName).hasElement();
- oldScriptFuture.join();
- if (scriptExistsFuture.join()) {
+ oldScript.block();
+ if (newScriptExists.block()) {
throw new DuplicateException();
}
performScriptRename(user,
newName,
- oldScriptFuture.join().orElseThrow(ScriptNotFoundException::new));
+ oldScript.blockOptional().orElseThrow(ScriptNotFoundException::new));
}
private void performScriptRename(User user, ScriptName newName, Script oldScript) {
- CompletableFuture.allOf(
+ Flux.merge(
cassandraSieveDAO.insertScript(user,
Script.builder()
.copyOf(oldScript)
@@ -209,15 +211,14 @@ public class CassandraSieveRepository implements SieveRepository {
.build()),
cassandraSieveDAO.deleteScriptInCassandra(user, oldScript.getName()),
performActiveScriptRename(user, oldScript.getName(), newName))
- .join();
+ .then()
+ .block();
}
- private CompletableFuture<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) {
+ private Mono<Void> performActiveScriptRename(User user, ScriptName oldName, ScriptName newName) {
return cassandraActiveScriptDAO.getActiveSctiptInfo(user)
- .thenCompose(optionalActivationInfo -> optionalActivationInfo
- .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName))
- .map(name -> cassandraActiveScriptDAO.activate(user, newName))
- .orElse(CompletableFuture.completedFuture(null)));
+ .filter(activeScriptInfo -> activeScriptInfo.getName().equals(oldName))
+ .flatMap(name -> cassandraActiveScriptDAO.activate(user, newName));
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
----------------------------------------------------------------------
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
index 45504ab..0d6eee8 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/user/cassandra/CassandraUsersRepository.java
@@ -137,7 +137,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
.setString(PASSWORD, defaultUser.getHashedPassword())
.setString(ALGORITHM, defaultUser.getHashAlgorithm())
.setString(NAME, defaultUser.getUserName().toLowerCase(Locale.US)))
- .join();
+ .block();
if (!executed) {
throw new UsersRepositoryException("Unable to update user");
@@ -149,7 +149,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
boolean executed = executor.executeReturnApplied(
removeUserStatement.bind()
.setString(NAME, name))
- .join();
+ .block();
if (!executed) {
throw new UsersRepositoryException("unable to remove unknown user " + name);
@@ -202,7 +202,7 @@ public class CassandraUsersRepository extends AbstractUsersRepository {
.setString(REALNAME, user.getUserName())
.setString(PASSWORD, user.getHashedPassword())
.setString(ALGORITHM, user.getHashAlgorithm()))
- .join();
+ .block();
if (!executed) {
throw new AlreadyExistInUsersRepositoryException("User with username " + username + " already exist!");
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org