You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:24 UTC
[09/12] james-project git commit: JAMES-2630 Migrate
CassandraAsyncExecutor.executeReturnApplied consumers to Reactor
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 23297ee..dab5496 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -46,7 +46,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -76,8 +75,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.streams.JamesCollectors;
import org.apache.james.util.streams.Limit;
@@ -95,6 +92,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMessageDAO {
public static final long DEFAULT_LONG_VALUE = 0L;
@@ -234,42 +233,36 @@ public class CassandraMessageDAO {
.collect(Guavate.toImmutableList());
}
- public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return CompletableFutureUtil.chainAll(
- limit.applyOnStream(messageIds.stream().distinct())
- .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
- ids -> rowToMessages(fetchType, ids))
- .thenApply(stream -> stream.flatMap(Function.identity()));
+ public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+ return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())
+ .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())))
+ .flatMap(ids -> rowToMessages(fetchType, ids));
}
- private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
- return FluentFutureStream.of(
- ids.stream()
- .map(id -> retrieveRow(id, fetchType)
- .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
- .completableFuture();
+ private Flux<MessageResult> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
+ return Flux.fromIterable(ids)
+ .flatMap(id -> retrieveRow(id, fetchType)
+ .flatMap(resultSet -> message(resultSet, id, fetchType)));
}
- private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+ private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
- return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
+ return cassandraAsyncExecutor.executeReactor(retrieveSelect(fetchType)
.bind()
.setUUID(MESSAGE_ID, cassandraMessageId.get()));
}
- private CompletableFuture<MessageResult>
+ private Mono<MessageResult>
message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
if (rows.isExhausted()) {
- return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
+ return Mono.just(notFound(messageIdWithMetaData));
}
Row row = rows.one();
- CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
-
- return contentFuture.thenApply(content -> {
+ return buildContentRetriever(fetchType, row).map(content -> {
MessageWithoutAttachment messageWithoutAttachment =
new MessageWithoutAttachment(
messageId.getMessageId(),
@@ -341,37 +334,37 @@ public class CassandraMessageDAO {
.setUUID(MESSAGE_ID, messageId.get()));
}
- private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
+ private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) {
switch (fetchType) {
case Full:
- return this::getFullContent;
+ return getFullContent(row);
case Headers:
- return this::getHeaderContent;
+ return getHeaderContent(row);
case Body:
- return row -> getBodyContent(row)
- .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+ return getBodyContent(row)
+ .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
case Metadata:
- return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
+ return Mono.just(EMPTY_BYTE_ARRAY);
default:
throw new RuntimeException("Unknown FetchType " + fetchType);
}
}
- private CompletableFuture<byte[]> getFullContent(Row row) {
+ private Mono<byte[]> getFullContent(Row row) {
return getHeaderContent(row)
- .thenCombine(getBodyContent(row), Bytes::concat);
+ .zipWith(getBodyContent(row), Bytes::concat);
}
- private CompletableFuture<byte[]> getBodyContent(Row row) {
+ private Mono<byte[]> getBodyContent(Row row) {
return getFieldContent(BODY_CONTENT, row);
}
- private CompletableFuture<byte[]> getHeaderContent(Row row) {
+ private Mono<byte[]> getHeaderContent(Row row) {
return getFieldContent(HEADER_CONTENT, row);
}
- private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
- return blobStore.readBytes(blobIdFactory.from(row.getString(field)));
+ private Mono<byte[]> getFieldContent(String field, Row row) {
+ return Mono.fromFuture(blobStore.readBytes(blobIdFactory.from(row.getString(field))));
}
public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 51365e6..fe93143 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -67,6 +67,7 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
public class CassandraMessageIdDAO {
@@ -169,8 +170,8 @@ public class CassandraMessageIdDAO {
.and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE))));
}
- public CompletableFuture<Void> delete(CassandraId mailboxId, MessageUid uid) {
- return cassandraAsyncExecutor.executeVoid(delete.bind()
+ public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) {
+ return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(IMAP_UID, uid.asLong()));
}
@@ -193,10 +194,10 @@ public class CassandraMessageIdDAO {
.setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags())));
}
- public CompletableFuture<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+ public Mono<Void> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
Flags flags = composedMessageIdWithMetaData.getFlags();
- return cassandraAsyncExecutor.executeVoid(update.bind()
+ return cassandraAsyncExecutor.executeVoidReactor(update.bind()
.setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq())
.setBool(ANSWERED, flags.contains(Flag.ANSWERED))
.setBool(DELETED, flags.contains(Flag.DELETED))
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 880ac16..0a7c0bc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -24,14 +24,14 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.stream.Stream;
import javax.mail.Flags;
import org.apache.commons.lang3.tuple.Pair;
+
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -49,7 +49,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.JamesCollectors;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Multimap;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CassandraMessageIdMapper implements MessageIdMapper {
@@ -99,47 +101,37 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
private Stream<SimpleMailboxMessage> findAsStream(Collection<MessageId> messageIds, FetchType fetchType) {
- return FluentFutureStream.of(
- messageIds.stream()
- .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())),
- FluentFutureStream::unboxStream)
- .collect(Guavate.toImmutableList())
- .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
- .thenApply(stream -> stream
+ return Flux.fromStream(messageIds.stream())
+ .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()))
+ .collectList()
+ .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
.filter(CassandraMessageDAO.MessageResult::isFound)
- .map(CassandraMessageDAO.MessageResult::message))
- .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType))
- .thenCompose(this::filterMessagesWithExistingMailbox)
- .join()
- .sorted(Comparator.comparing(MailboxMessage::getUid));
- }
-
- private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWithExistingMailbox(Stream<SimpleMailboxMessage> stream) {
- return FluentFutureStream.of(stream.map(this::keepMessageIfMailboxExists), FluentFutureStream::unboxOptional)
- .completableFuture();
+ .map(CassandraMessageDAO.MessageResult::message)
+ .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+ .flatMap(this::keepMessageIfMailboxExists)
+ .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
+ .block()
+ .stream();
}
- private CompletableFuture<Optional<SimpleMailboxMessage>> keepMessageIfMailboxExists(SimpleMailboxMessage message) {
+ private Mono<SimpleMailboxMessage> keepMessageIfMailboxExists(SimpleMailboxMessage message) {
CassandraId cassandraId = (CassandraId) message.getMailboxId();
return mailboxDAO.retrieveMailbox(cassandraId)
- .thenApply(optional -> {
- if (!optional.isPresent()) {
+ .map(any -> message)
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
cassandraId,
message.getMailboxId());
- return Optional.empty();
- }
-
- return Optional.of(message);
- });
+ }));
}
@Override
public List<MailboxId> findMailboxes(MessageId messageId) {
- return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()).join()
+ return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())
.map(ComposedMessageIdWithMetaData::getComposedMessageId)
.map(ComposedMessageId::getMailboxId)
- .collect(Guavate.toImmutableList());
+ .collectList()
+ .block();
}
@Override
@@ -182,114 +174,115 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
@Override
public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) {
- deleteAsFuture(messageId, mailboxIds).join();
+ deleteAsMono(messageId, mailboxIds).block();
}
- public CompletableFuture<Void> deleteAsFuture(MessageId messageId, Collection<MailboxId> mailboxIds) {
+ public Mono<Void> deleteAsMono(MessageId messageId, Collection<MailboxId> mailboxIds) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
- return mailboxIds.stream()
- .map(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)))
- .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2))
- .orElse(CompletableFuture.completedFuture(null));
+ return Flux.fromStream(mailboxIds.stream())
+ .flatMap(mailboxId -> retrieveAndDeleteIndices(cassandraMessageId, Optional.of((CassandraId) mailboxId)))
+ .then();
}
@Override
public void delete(Multimap<MessageId, MailboxId> ids) {
- ids.asMap()
- .entrySet()
- .stream()
- .collect(JamesCollectors.chunker(cassandraConfiguration.getExpungeChunkSize()))
- .forEach(chunk ->
- FluentFutureStream.of(chunk.stream()
- .map(entry -> deleteAsFuture(entry.getKey(), entry.getValue())))
- .join());
+ Flux.fromIterable(ids.asMap()
+ .entrySet())
+ .limitRate(cassandraConfiguration.getExpungeChunkSize())
+ .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
+ .then()
+ .block();
}
- private CompletableFuture<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+ private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
return imapUidDAO.retrieve(messageId, mailboxId)
- .thenCompose(composedMessageIds -> composedMessageIds
- .map(this::deleteIds)
- .reduce((f1, f2) -> CompletableFuture.allOf(f1, f2))
- .orElse(CompletableFuture.completedFuture(null)));
+ .flatMap(this::deleteIds)
+ .then();
}
@Override
public void delete(MessageId messageId) {
CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
retrieveAndDeleteIndices(cassandraMessageId, Optional.empty())
- .join();
+ .block();
}
- private CompletableFuture<Void> deleteIds(ComposedMessageIdWithMetaData metaData) {
+ private Mono<Void> deleteIds(ComposedMessageIdWithMetaData metaData) {
CassandraMessageId messageId = (CassandraMessageId) metaData.getComposedMessageId().getMessageId();
CassandraId mailboxId = (CassandraId) metaData.getComposedMessageId().getMailboxId();
- return CompletableFuture.allOf(
- imapUidDAO.delete(messageId, mailboxId),
- messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
- .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId).toFuture());
+ return Flux.merge(
+ imapUidDAO.delete(messageId, mailboxId),
+ messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
+ .then(indexTableHandler.updateIndexOnDelete(metaData, mailboxId));
}
@Override
public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
- return mailboxIds.stream()
+ return Flux.fromIterable(mailboxIds)
.distinct()
.map(mailboxId -> (CassandraId) mailboxId)
- .filter(mailboxId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
- .join()
- .findAny()
- .isPresent())
+ .filter(mailboxId -> haveMetaData(messageId, mailboxId))
.flatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
- .map(this::updateCounts)
- .map(Mono::block)
- .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
+ .flatMap(this::updateCounts)
+ .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight))
+ .block();
}
- private Stream<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private boolean haveMetaData(MessageId messageId, CassandraId mailboxId) {
+ return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
+ .hasElements()
+ .block();
+ }
+
+ private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
try {
- Pair<Flags, ComposedMessageIdWithMetaData> pair = new FunctionRunnerWithRetry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
- .executeAndRetrieveObject(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId));
- ComposedMessageIdWithMetaData composedMessageIdWithMetaData = pair.getRight();
- Flags oldFlags = pair.getLeft();
- return Stream.of(Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
- UpdatedFlags.builder()
- .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
- .modSeq(composedMessageIdWithMetaData.getModSeq())
- .oldFlags(oldFlags)
- .newFlags(composedMessageIdWithMetaData.getFlags())
- .build()));
- } catch (LightweightTransactionException e) {
- throw new RuntimeException(e);
+ return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
+ .single()
+ .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
+ .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
} catch (MailboxDeleteDuringUpdateException e) {
LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
- return Stream.of();
+ return Mono.empty();
}
}
+ private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
+ return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
+ UpdatedFlags.builder()
+ .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
+ .modSeq(composedMessageIdWithMetaData.getModSeq())
+ .oldFlags(oldFlags)
+ .newFlags(composedMessageIdWithMetaData.getFlags())
+ .build());
+ }
+
private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) {
CassandraId cassandraId = (CassandraId) pair.getLeft();
return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight())
- .then(Mono.just(pair));
+ .thenReturn(pair);
}
- private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
try {
return updateFlags(mailboxId, messageId, newState, updateMode);
} catch (MailboxException e) {
LOGGER.error("Error while updating flags on mailbox: {}", mailboxId);
- return Optional.empty();
+ return Mono.empty();
}
}
- private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
ComposedMessageIdWithMetaData oldComposedId = imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
- .join()
- .findFirst()
+ .next()
+ .blockOptional()
.orElseThrow(MailboxDeleteDuringUpdateException::new);
+
Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
if (identicalFlags(oldComposedId, newFlags)) {
- return Optional.of(Pair.of(oldComposedId.getFlags(), oldComposedId));
+ return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
}
+
ComposedMessageIdWithMetaData newComposedId = new ComposedMessageIdWithMetaData(
oldComposedId.getComposedMessageId(),
newFlags,
@@ -302,15 +295,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
return oldComposedId.getFlags().equals(newFlags);
}
- private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
+ private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq())
- .thenCompose(updateSuccess -> Optional.of(updateSuccess)
- .filter(b -> b)
- .map((Boolean any) -> messageIdDAO.updateMetadata(newComposedId).thenApply(v -> updateSuccess))
- .orElse(CompletableFuture.completedFuture(updateSuccess)))
- .thenApply(success -> Optional.of(success)
- .filter(b -> b)
- .map(any -> Pair.of(oldComposedId.getFlags(), newComposedId)))
- .join();
+ .filter(FunctionalUtils.toPredicate(Function.identity()))
+ .flatMap(any -> messageIdDAO.updateMetadata(newComposedId)
+ .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 169e32d..be5686a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -42,7 +42,6 @@ import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
import javax.inject.Inject;
import javax.mail.Flags;
@@ -64,6 +63,8 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMessageIdToImapUidDAO {
@@ -148,8 +149,8 @@ public class CassandraMessageIdToImapUidDAO {
.and(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
}
- public CompletableFuture<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeVoid(delete.bind()
+ public Mono<Void> delete(CassandraMessageId messageId, CassandraId mailboxId) {
+ return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
.setUUID(MESSAGE_ID, messageId.get())
.setUUID(MAILBOX_ID, mailboxId.asUuid()));
}
@@ -172,7 +173,7 @@ public class CassandraMessageIdToImapUidDAO {
.setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags())));
}
- public CompletableFuture<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) {
+ public Mono<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, long oldModSeq) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
Flags flags = composedMessageIdWithMetaData.getFlags();
return cassandraAsyncExecutor.executeReturnApplied(update.bind()
@@ -191,10 +192,10 @@ public class CassandraMessageIdToImapUidDAO {
.setLong(MOD_SEQ_CONDITION, oldModSeq));
}
- public CompletableFuture<Stream<ComposedMessageIdWithMetaData>> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+ public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
return selectStatement(messageId, mailboxId)
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
- .map(this::toComposedMessageIdWithMetadata));
+ .flatMapMany(cassandraUtils::convertToFlux)
+ .map(this::toComposedMessageIdWithMetadata);
}
private ComposedMessageIdWithMetaData toComposedMessageIdWithMetadata(Row row) {
@@ -208,12 +209,12 @@ public class CassandraMessageIdToImapUidDAO {
.build();
}
- private CompletableFuture<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+ private Mono<ResultSet> selectStatement(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
return mailboxId
- .map(cassandraId -> cassandraAsyncExecutor.execute(select.bind()
+ .map(cassandraId -> cassandraAsyncExecutor.executeReactor(select.bind()
.setUUID(MESSAGE_ID, messageId.get())
.setUUID(MAILBOX_ID, cassandraId.asUuid())))
- .orElseGet(() -> cassandraAsyncExecutor.execute(selectAll.bind()
+ .orElseGet(() -> cassandraAsyncExecutor.executeReactor(selectAll.bind()
.setUUID(MESSAGE_ID, messageId.get())));
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index f2b468f..d2c4006 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.OptionalUtils;
+import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,8 +154,8 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
MessageUid uid = composedMessageId.getUid();
return Flux.merge(
- Mono.fromCompletionStage(imapUidDAO.delete(messageId, mailboxId)),
- Mono.fromCompletionStage(messageIdDAO.delete(mailboxId, uid)))
+ imapUidDAO.delete(messageId, mailboxId),
+ messageIdDAO.delete(mailboxId, uid))
.then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
}
@@ -162,9 +163,9 @@ public class CassandraMessageMapper implements MessageMapper {
public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
- .map(SimpleMailboxMessage -> (MailboxMessage) SimpleMailboxMessage)
+ .map(simpleMailboxMessage -> (MailboxMessage) simpleMailboxMessage)
.collectSortedList(Comparator.comparing(MailboxMessage::getUid))
- .flatMapMany(Flux::fromIterable)
+ .flatMapIterable(Function.identity())
.toIterable()
.iterator();
}
@@ -176,12 +177,10 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Flux<SimpleMailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return Mono.fromCompletionStage(messageDAO.retrieveMessages(messageIds, fetchType, limit))
- .map(stream -> stream
- .filter(CassandraMessageDAO.MessageResult::isFound)
- .map(CassandraMessageDAO.MessageResult::message))
- .flatMap(stream -> Mono.fromCompletionStage(attachmentLoader.addAttachmentToMessages(stream, fetchType)))
- .flatMapMany(Flux::fromStream);
+ return messageDAO.retrieveMessages(messageIds, fetchType, limit)
+ .filter(CassandraMessageDAO.MessageResult::isFound)
+ .map(CassandraMessageDAO.MessageResult::message)
+ .flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType));
}
@Override
@@ -196,9 +195,7 @@ public class CassandraMessageMapper implements MessageMapper {
public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return firstUnseenDAO.retrieveFirstUnread(mailboxId)
- .map(Optional::of)
- .defaultIfEmpty(Optional.empty())
- .block()
+ .blockOptional()
.orElse(null);
}
@@ -216,9 +213,8 @@ public class CassandraMessageMapper implements MessageMapper {
private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
return retrieveComposedId(mailboxId, messageUid)
.flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
- .flatMap(idWithMetadata ->
- Mono.fromCompletionStage(messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())))
- .flatMapMany(Flux::fromStream)
+ .flatMapMany(idWithMetadata ->
+ messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
.map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
@@ -262,13 +258,13 @@ public class CassandraMessageMapper implements MessageMapper {
}
private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
- CompletableFuture<Optional<MessageUid>> uidFuture = uidProvider.nextUid(mailboxId);
- CompletableFuture<Optional<Long>> modseqFuture = modSeqProvider.nextModSeq(mailboxId);
- CompletableFuture.allOf(uidFuture, modseqFuture).join();
+ final Mono<MessageUid> messageUidMono = uidProvider.nextUid(mailboxId).cache();
+ final Mono<Long> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId).cache();
+ Flux.merge(messageUidMono, nextModSeqMono).then();
- message.setUid(uidFuture.join()
+ message.setUid(messageUidMono.blockOptional()
.orElseThrow(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
- message.setModSeq(modseqFuture.join()
+ message.setModSeq(nextModSeqMono.blockOptional()
.orElseThrow(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
return message;
@@ -323,8 +319,8 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<Long> computeNewModSeq(CassandraId mailboxId) {
- return Mono.fromCompletionStage(modSeqProvider.nextModSeq(mailboxId))
- .map(value -> value.orElseThrow(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
+ return modSeqProvider.nextModSeq(mailboxId)
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
}
private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
@@ -429,11 +425,10 @@ public class CassandraMessageMapper implements MessageMapper {
.modSeq(newModSeq)
.flags(newFlags)
.build();
- return Mono.fromCompletionStage(imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq()))
+ return imapUidDAO.updateMetadata(newMetadata, oldMetadata.getModSeq())
.flatMap(success -> {
if (success) {
- return Mono.fromCompletionStage(messageIdDAO.updateMetadata(newMetadata))
- .then(Mono.just(true));
+ return messageIdDAO.updateMetadata(newMetadata).thenReturn(true);
} else {
return Mono.just(false);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 63e045b..6f798c5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -29,30 +29,33 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
+import java.time.Duration;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.Function;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.util.FunctionalUtils;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import reactor.core.publisher.Mono;
public class CassandraModSeqProvider implements ModSeqProvider {
public static final String MOD_SEQ_CONDITION = "modSeqCondition";
+ private final long maxModSeqRetries;
public static class ExceptionRelay extends RuntimeException {
private final MailboxException underlying;
@@ -81,7 +84,6 @@ public class CassandraModSeqProvider implements ModSeqProvider {
private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
private final CassandraAsyncExecutor cassandraAsyncExecutor;
- private final FunctionRunnerWithRetry runner;
private final PreparedStatement select;
private final PreparedStatement update;
private final PreparedStatement insert;
@@ -89,17 +91,12 @@ public class CassandraModSeqProvider implements ModSeqProvider {
@Inject
public CassandraModSeqProvider(Session session, CassandraConfiguration cassandraConfiguration) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
- this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getModSeqMaxRetry());
+ this.maxModSeqRetries = cassandraConfiguration.getModSeqMaxRetry();
this.insert = prepareInsert(session);
this.update = prepareUpdate(session);
this.select = prepareSelect(session);
}
- @VisibleForTesting
- public CassandraModSeqProvider(Session session) {
- this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
- }
-
private PreparedStatement prepareInsert(Session session) {
return session.prepare(insertInto(TABLE_NAME)
.value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))
@@ -125,81 +122,79 @@ public class CassandraModSeqProvider implements ModSeqProvider {
@Override
public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return nextModSeq(mailboxId).join()
+ return nextModSeq(mailboxId)
+ .blockOptional()
.orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
}
@Override
public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException {
return nextModSeq((CassandraId) mailboxId)
- .join()
+ .blockOptional()
.orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
}
@Override
public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
- return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue());
+ return highestModSeq(mailboxSession, mailbox.getMailboxId());
}
@Override
public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException {
- return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue());
+ return unbox(() -> findHighestModSeq((CassandraId) mailboxId).block().orElse(FIRST_MODSEQ).getValue());
}
- private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) {
- return cassandraAsyncExecutor.executeSingleRow(
+ private Mono<Optional<ModSeq>> findHighestModSeq(CassandraId mailboxId) {
+ return cassandraAsyncExecutor.executeSingleRowOptionalReactor(
select.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
- .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ)))
- .orElse(FIRST_MODSEQ));
+ .map(maybeRow -> maybeRow.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ))));
}
- private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
+ private Mono<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
ModSeq nextModSeq = modSeq.next();
return cassandraAsyncExecutor.executeReturnApplied(
insert.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.getValue()))
- .thenApply(success -> successToModSeq(nextModSeq, success));
+ .flatMap(success -> successToModSeq(nextModSeq, success));
}
- private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
+ private Mono<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
ModSeq nextModSeq = modSeq.next();
return cassandraAsyncExecutor.executeReturnApplied(
update.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(NEXT_MODSEQ, nextModSeq.getValue())
.setLong(MOD_SEQ_CONDITION, modSeq.getValue()))
- .thenApply(success -> successToModSeq(nextModSeq, success));
+ .flatMap(success -> successToModSeq(nextModSeq, success));
}
- private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
- if (success) {
- return Optional.of(modSeq);
- }
- return Optional.empty();
+ private Mono<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+ return Mono.just(success)
+ .filter(FunctionalUtils.toPredicate(Function.identity()))
+ .map(any -> modSeq);
}
-
- public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) {
+
+ public Mono<Long> nextModSeq(CassandraId mailboxId) {
return findHighestModSeq(mailboxId)
- .thenCompose(modSeq -> {
- if (modSeq.isFirst()) {
- return tryInsertModSeq(mailboxId, FIRST_MODSEQ);
- }
- return tryUpdateModSeq(mailboxId, modSeq);
- }).thenCompose(firstInsert -> {
- if (firstInsert.isPresent()) {
- return CompletableFuture.completedFuture(firstInsert);
- }
- return handleRetries(mailboxId);
- })
- .thenApply(optional -> optional.map(ModSeq::getValue));
- }
-
- private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) {
- return runner.executeAsyncAndRetrieveObject(
- () -> findHighestModSeq(mailboxId)
- .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq)));
+ .flatMap(maybeHighestModSeq -> maybeHighestModSeq
+ .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))
+ .orElseGet(() -> tryInsertModSeq(mailboxId, FIRST_MODSEQ)))
+ .switchIfEmpty(handleRetries(mailboxId))
+ .map(ModSeq::getValue);
+ }
+
+ private Mono<ModSeq> handleRetries(CassandraId mailboxId) {
+ return tryFindThenUpdateOnce(mailboxId)
+ .single()
+ .retryBackoff(maxModSeqRetries, Duration.ofMillis(2));
+ }
+
+ private Mono<ModSeq> tryFindThenUpdateOnce(CassandraId mailboxId) {
+ return Mono.defer(() -> findHighestModSeq(mailboxId)
+ .flatMap(Mono::justOrEmpty)
+ .flatMap(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq)));
}
private static class ModSeq {
@@ -216,9 +211,12 @@ public class CassandraModSeqProvider implements ModSeqProvider {
public long getValue() {
return value;
}
-
- public boolean isFirst() {
- return value == FIRST_MODSEQ.value;
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("value", value)
+ .toString();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index b402bf8..bd3baac 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -30,13 +30,11 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -47,13 +45,13 @@ import org.apache.james.mailbox.store.mail.model.Mailbox;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
-import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
public class CassandraUidProvider implements UidProvider {
private static final String CONDITION = "Condition";
private final CassandraAsyncExecutor executor;
- private final FunctionRunnerWithRetry runner;
+ private final long maxUidRetries;
private final PreparedStatement insertStatement;
private final PreparedStatement updateStatement;
private final PreparedStatement selectStatement;
@@ -61,17 +59,12 @@ public class CassandraUidProvider implements UidProvider {
@Inject
public CassandraUidProvider(Session session, CassandraConfiguration cassandraConfiguration) {
this.executor = new CassandraAsyncExecutor(session);
- this.runner = new FunctionRunnerWithRetry(cassandraConfiguration.getUidMaxRetry());
+ this.maxUidRetries = cassandraConfiguration.getUidMaxRetry();
this.selectStatement = prepareSelect(session);
this.updateStatement = prepareUpdate(session);
this.insertStatement = prepareInsert(session);
}
- @VisibleForTesting
- public CassandraUidProvider(Session session) {
- this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
- }
-
private PreparedStatement prepareSelect(Session session) {
return session.prepare(select(NEXT_UID)
.from(TABLE_NAME)
@@ -101,66 +94,56 @@ public class CassandraUidProvider implements UidProvider {
public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
return nextUid(cassandraId)
- .join()
- .orElseThrow(() -> new MailboxException("Error during Uid update"));
+ .blockOptional()
+ .orElseThrow(() -> new MailboxException("Error during Uid update"));
}
- public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId) {
- return findHighestUid(cassandraId)
- .thenCompose(optional -> {
- if (optional.isPresent()) {
- return tryUpdateUid(cassandraId, optional);
- }
- return tryInsert(cassandraId);
- })
- .thenCompose(optional -> {
- if (optional.isPresent()) {
- return CompletableFuture.completedFuture(optional);
- }
- return runner.executeAsyncAndRetrieveObject(
- () -> findHighestUid(cassandraId)
- .thenCompose(readUid -> tryUpdateUid(cassandraId, readUid)));
- });
+ public Mono<MessageUid> nextUid(CassandraId cassandraId) {
+ Mono<MessageUid> updateUid = findHighestUid(cassandraId)
+ .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
+
+ return updateUid
+ .switchIfEmpty(tryInsert(cassandraId))
+ .switchIfEmpty(updateUid)
+ .single()
+ .retry(maxUidRetries);
}
@Override
public Optional<MessageUid> lastUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
- return findHighestUid((CassandraId) mailbox.getMailboxId()).join();
+ return findHighestUid((CassandraId) mailbox.getMailboxId())
+ .blockOptional();
}
- private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId mailboxId) {
- return executor.executeSingleRow(
+ private Mono<MessageUid> findHighestUid(CassandraId mailboxId) {
+ return Mono.defer(() -> executor.executeSingleRowReactor(
selectStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
- .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID))));
+ .map(row -> MessageUid.of(row.getLong(NEXT_UID))));
}
- private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId mailboxId, Optional<MessageUid> uid) {
- if (uid.isPresent()) {
- MessageUid nextUid = uid.get().next();
- return executor.executeReturnApplied(
+ private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid) {
+ MessageUid nextUid = uid.next();
+ return Mono.defer(() -> executor.executeReturnApplied(
updateStatement.bind()
- .setUUID(MAILBOX_ID, mailboxId.asUuid())
- .setLong(CONDITION, uid.get().asLong())
- .setLong(NEXT_UID, nextUid.asLong()))
- .thenApply(success -> successToUid(nextUid, success));
- } else {
- return tryInsert(mailboxId);
- }
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(CONDITION, uid.asLong())
+ .setLong(NEXT_UID, nextUid.asLong()))
+ .flatMap(success -> successToUid(nextUid, success)));
}
- private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId) {
- return executor.executeReturnApplied(
+ private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
+ return Mono.defer(() -> executor.executeReturnApplied(
insertStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
- .thenApply(success -> successToUid(MessageUid.MIN_VALUE, success));
+ .flatMap(success -> successToUid(MessageUid.MIN_VALUE, success)));
}
- private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
+ private Mono<MessageUid> successToUid(MessageUid uid, Boolean success) {
if (success) {
- return Optional.of(uid);
+ return Mono.just(uid);
}
- return Optional.empty();
+ return Mono.empty();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index 7f93150..4bfeafb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -27,10 +27,8 @@ import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRight
import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraUserMailboxRightsTable.USER_NAME;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
@@ -44,13 +42,14 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights;
-import org.apache.james.util.FluentFutureStream;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.github.fge.lambdas.Throwing;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraUserMailboxRightsDAO {
@@ -98,31 +97,32 @@ public class CassandraUserMailboxRightsDAO {
.where(eq(USER_NAME, bindMarker(USER_NAME))));
}
- public CompletableFuture<Void> update(CassandraId cassandraId, ACLDiff aclDiff) {
+ public Mono<Void> update(CassandraId cassandraId, ACLDiff aclDiff) {
PositiveUserACLDiff userACLDiff = new PositiveUserACLDiff(aclDiff);
- return CompletableFuture.allOf(
+ return Flux.merge(
addAll(cassandraId, userACLDiff.addedEntries()),
removeAll(cassandraId, userACLDiff.removedEntries()),
- addAll(cassandraId, userACLDiff.changedEntries()));
+ addAll(cassandraId, userACLDiff.changedEntries()))
+ .then();
}
- private CompletableFuture<Stream<Void>> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
- return FluentFutureStream.of(removedEntries
- .map(entry -> cassandraAsyncExecutor.executeVoid(
+ private Mono<Void> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
+ return Flux.fromStream(removedEntries)
+ .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor(
delete.bind()
.setString(USER_NAME, entry.getKey().getName())
- .setUUID(MAILBOX_ID, cassandraId.asUuid()))))
- .completableFuture();
+ .setUUID(MAILBOX_ID, cassandraId.asUuid())))
+ .then();
}
- private CompletableFuture<Stream<Void>> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
- return FluentFutureStream.of(addedEntries
- .map(entry -> cassandraAsyncExecutor.executeVoid(
+ private Mono<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
+ return Flux.fromStream(addedEntries)
+ .flatMap(entry -> cassandraAsyncExecutor.executeVoidReactor(
insert.bind()
.setString(USER_NAME, entry.getKey().getName())
.setUUID(MAILBOX_ID, cassandraId.asUuid())
- .setString(RIGHTS, entry.getValue().serialize()))))
- .completableFuture();
+ .setString(RIGHTS, entry.getValue().serialize())))
+ .then();
}
public CompletableFuture<Optional<Rfc4314Rights>> retrieve(String userName, CassandraId mailboxId) {
@@ -134,14 +134,12 @@ public class CassandraUserMailboxRightsDAO {
rowOptional.map(Throwing.function(row -> Rfc4314Rights.fromSerializedRfc4314Rights(row.getString(RIGHTS)))));
}
- public CompletableFuture<Map<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) {
- return cassandraAsyncExecutor.execute(
+ public Flux<Pair<CassandraId, Rfc4314Rights>> listRightsForUser(String userName) {
+ return cassandraAsyncExecutor.executeReactor(
selectUser.bind()
.setString(USER_NAME, userName))
- .thenApply(cassandraUtils::convertToStream)
- .thenApply(row ->
- row.map(Throwing.function(this::toPair))
- .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)));
+ .flatMapMany(cassandraUtils::convertToFlux)
+ .map(Throwing.function(this::toPair));
}
private Pair<CassandraId, Rfc4314Rights> toPair(Row row) throws UnsupportedRightException {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
index d43c8a6..2ab5e56 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentMessageIdCreation.java
@@ -25,10 +25,13 @@ import org.apache.james.backends.cassandra.migration.Migration;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO.MessageIdAttachmentIds;
+import org.apache.james.mailbox.model.MessageId;
import org.apache.james.task.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
public class AttachmentMessageIdCreation implements Migration {
private static final Logger LOGGER = LoggerFactory.getLogger(AttachmentMessageIdCreation.class);
private final CassandraMessageDAO cassandraMessageDAO;
@@ -56,10 +59,11 @@ public class AttachmentMessageIdCreation implements Migration {
private Result createIndex(MessageIdAttachmentIds message) {
try {
- message.getAttachmentId()
- .forEach(attachmentId -> attachmentMessageIdDAO
- .storeAttachmentForMessageId(attachmentId, message.getMessageId())
- .join());
+ MessageId messageId = message.getMessageId();
+ Flux.fromIterable(message.getAttachmentId())
+ .flatMap(attachmentId -> attachmentMessageIdDAO.storeAttachmentForMessageId(attachmentId, messageId))
+ .then()
+ .block();
return Result.COMPLETED;
} catch (Exception e) {
LOGGER.error("Error while creation attachmentId -> messageIds index", e);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index 148d395..bebf83d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -61,7 +61,7 @@ public class AttachmentV2Migration implements Migration {
try {
blobStore.save(attachment.getBytes())
.thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
- .thenCompose(attachmentDAOV2::storeAttachment)
+ .thenCompose(daoAttachement -> attachmentDAOV2.storeAttachment(daoAttachement).toFuture())
.thenCompose(any -> attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()))
.join();
return Result.COMPLETED;
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
index 70f4a8e..c4f3fd2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
@@ -80,9 +80,9 @@ public class MailboxPathV2Migration implements Migration {
public Result migrate(CassandraIdAndPath idAndPath) {
try {
- daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).join();
+ daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).block();
- daoV1.delete(idAndPath.getMailboxPath()).join();
+ daoV1.delete(idAndPath.getMailboxPath()).block();
return Result.COMPLETED;
} catch (Exception e) {
LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), e);
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
index 1a4a254..0032ef6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
@@ -63,7 +63,7 @@ public class MailboxMergingTaskRunner {
return moveMessages(oldMailboxId, newMailboxId, mailboxSession, context)
.onComplete(
() -> mergeRights(oldMailboxId, newMailboxId),
- () -> mailboxDAO.delete(oldMailboxId).join());
+ () -> mailboxDAO.delete(oldMailboxId).block());
}
private Task.Result moveMessages(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxSession session, MailboxMergingTask.Context context) {
@@ -88,12 +88,12 @@ public class MailboxMergingTaskRunner {
private void mergeRights(CassandraId oldMailboxId, CassandraId newMailboxId) {
try {
- MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).join();
- MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).join();
+ MailboxACL oldAcl = cassandraACLMapper.getACL(oldMailboxId).block();
+ MailboxACL newAcl = cassandraACLMapper.getACL(newMailboxId).block();
MailboxACL finalAcl = newAcl.union(oldAcl);
cassandraACLMapper.setACL(newMailboxId, finalAcl);
- rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).join();
+ rightsDAO.update(oldMailboxId, ACLDiff.computeDiff(oldAcl, MailboxACL.EMPTY)).block();
} catch (MailboxException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
index d4b073d..4161e8d 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
@@ -23,21 +23,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collection;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.Cid;
import org.apache.james.mailbox.model.MessageAttachment;
-import org.assertj.core.data.MapEntry;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
public class AttachmentLoaderTest {
@@ -53,15 +49,15 @@ public class AttachmentLoaderTest {
@Test
public void getAttachmentsShouldWorkWithDuplicatedAttachments() {
AttachmentId attachmentId = AttachmentId.from("1");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
Attachment attachment = Attachment.builder()
.attachmentId(attachmentId)
.bytes("attachment".getBytes())
.type("type")
.build();
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+ when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+ .thenReturn(Mono.just(attachment));
Optional<String> name = Optional.of("name1");
Optional<Cid> cid = Optional.empty();
@@ -69,7 +65,7 @@ public class AttachmentLoaderTest {
MessageAttachmentRepresentation attachmentRepresentation = new MessageAttachmentRepresentation(attachmentId, name, cid, isInlined);
Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation, attachmentRepresentation))
- .join();
+ .block();
MessageAttachment expectedAttachment = new MessageAttachment(attachment, name, cid, isInlined);
assertThat(attachments).hasSize(2)
@@ -79,15 +75,15 @@ public class AttachmentLoaderTest {
@Test
public void getAttachmentsShouldWorkWithDuplicatedIds() {
AttachmentId attachmentId = AttachmentId.from("1");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
Attachment attachment = Attachment.builder()
.attachmentId(attachmentId)
.bytes("attachment".getBytes())
.type("type")
.build();
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+ when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+ .thenReturn(Mono.just(attachment));
Optional<String> name1 = Optional.of("name1");
Optional<String> name2 = Optional.of("name2");
@@ -97,7 +93,7 @@ public class AttachmentLoaderTest {
MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId, name2, cid, isInlined);
Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2))
- .join();
+ .block();
assertThat(attachments).hasSize(2)
.containsOnly(new MessageAttachment(attachment, name1, cid, isInlined),
@@ -108,7 +104,6 @@ public class AttachmentLoaderTest {
public void getAttachmentsShouldReturnMultipleAttachmentWhenSeveralAttachmentsRepresentation() {
AttachmentId attachmentId1 = AttachmentId.from("1");
AttachmentId attachmentId2 = AttachmentId.from("2");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId1, attachmentId2);
Attachment attachment1 = Attachment.builder()
.attachmentId(attachmentId1)
@@ -120,8 +115,11 @@ public class AttachmentLoaderTest {
.bytes("attachment2".getBytes())
.type("type")
.build();
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment1, attachment2)));
+
+ when(attachmentMapper.getAttachmentsAsMono(attachmentId1))
+ .thenReturn(Mono.just(attachment1));
+ when(attachmentMapper.getAttachmentsAsMono(attachmentId2))
+ .thenReturn(Mono.just(attachment2));
Optional<String> name1 = Optional.of("name1");
Optional<String> name2 = Optional.of("name2");
@@ -131,7 +129,7 @@ public class AttachmentLoaderTest {
MessageAttachmentRepresentation attachmentRepresentation2 = new MessageAttachmentRepresentation(attachmentId2, name2, cid, isInlined);
Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of(attachmentRepresentation1, attachmentRepresentation2))
- .join();
+ .block();
assertThat(attachments).hasSize(2)
.containsOnly(new MessageAttachment(attachment1, name1, cid, isInlined),
@@ -141,61 +139,19 @@ public class AttachmentLoaderTest {
@Test
public void getAttachmentsShouldReturnEmptyByDefault() {
AttachmentId attachmentId = AttachmentId.from("1");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId);
Attachment attachment = Attachment.builder()
.attachmentId(attachmentId)
.bytes("attachment".getBytes())
.type("type")
.build();
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
+
+ when(attachmentMapper.getAttachmentsAsMono(attachmentId))
+ .thenReturn(Mono.just(attachment));
Collection<MessageAttachment> attachments = testee.getAttachments(ImmutableList.of())
- .join();
+ .block();
assertThat(attachments).isEmpty();
}
-
- @Test
- public void attachmentsByIdShouldReturnMapWhenExist() {
- AttachmentId attachmentId = AttachmentId.from("1");
- AttachmentId attachmentId2 = AttachmentId.from("2");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2);
-
- Attachment attachment = Attachment.builder()
- .attachmentId(attachmentId)
- .bytes("attachment".getBytes())
- .type("type")
- .build();
- Attachment attachment2 = Attachment.builder()
- .attachmentId(attachmentId2)
- .bytes("attachment2".getBytes())
- .type("type")
- .build();
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment, attachment2)));
-
- Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds)
- .join();
-
- assertThat(attachmentsById).hasSize(2)
- .containsOnly(MapEntry.entry(attachmentId, attachment), MapEntry.entry(attachmentId2, attachment2));
- }
-
- @Test
- public void attachmentsByIdShouldReturnEmptyMapWhenAttachmentsDontExists() {
- AttachmentId attachmentId = AttachmentId.from("1");
- AttachmentId attachmentId2 = AttachmentId.from("2");
- Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, attachmentId2);
-
- when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
- .thenReturn(CompletableFuture.completedFuture(ImmutableList.of()));
-
- Map<AttachmentId, Attachment> attachmentsById = testee.attachmentsById(attachmentIds)
- .join();
-
- assertThat(attachmentsById).hasSize(0);
- }
-
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
index d9f7c89..8b94cdb 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
@@ -78,12 +78,12 @@ class CassandraACLMapperTest {
.value(CassandraACLTable.ACL, "{\"entries\":{\"bob\":invalid}}")
.value(CassandraACLTable.VERSION, 1));
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
}
@Test
void retrieveACLWhenNoACLStoredShouldReturnEmptyACL() {
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
}
@Test
@@ -94,7 +94,7 @@ class CassandraACLMapperTest {
cassandraACLMapper.updateACL(MAILBOX_ID,
MailboxACL.command().key(key).rights(rights).asAddition());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
.isEqualTo(new MailboxACL().union(key, rights));
}
@@ -107,7 +107,7 @@ class CassandraACLMapperTest {
MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false);
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(keyAlice).rights(rights).asAddition());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
.isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights));
}
@@ -119,7 +119,7 @@ class CassandraACLMapperTest {
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asRemoval());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
}
@Test
@@ -130,7 +130,7 @@ class CassandraACLMapperTest {
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).noRights().asReplacement());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(MailboxACL.EMPTY);
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(MailboxACL.EMPTY);
}
@Test
@@ -140,7 +140,7 @@ class CassandraACLMapperTest {
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asReplacement());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights));
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights));
}
@Test
@@ -155,11 +155,11 @@ class CassandraACLMapperTest {
cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join()).isEqualTo(new MailboxACL().union(key, rights));
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()).isEqualTo(new MailboxACL().union(key, rights));
}
@Test
- void twoConcurrentUpdatesWhenNoACEStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception {
+ void twoConcurrentUpdatesWhenNoACLStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false);
MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
@@ -168,12 +168,12 @@ class CassandraACLMapperTest {
Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
awaitAll(future1, future2);
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
.isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights));
}
@Test
- void twoConcurrentUpdatesWhenStoredShouldReturnACEWithTwoEntries(CassandraCluster cassandra) throws Exception {
+ void twoConcurrentUpdatesWhenStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
MailboxACL.EntryKey keyBenwa = new MailboxACL.EntryKey("benwa", MailboxACL.NameType.user, false);
MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
@@ -185,7 +185,7 @@ class CassandraACLMapperTest {
Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
awaitAll(future1, future2);
- assertThat(cassandraACLMapper.getACL(MAILBOX_ID).join())
+ assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
.isEqualTo(new MailboxACL().union(keyBob, rights).union(keyAlice, rights).union(keyBenwa, rights));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
index f82375e..6cd0b2f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOTest.java
@@ -55,7 +55,7 @@ class CassandraAttachmentDAOTest {
@Test
void getAttachmentShouldReturnEmptyWhenAbsent() {
- Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).join();
+ Optional<Attachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional();
assertThat(attachment).isEmpty();
}
@@ -98,7 +98,7 @@ class CassandraAttachmentDAOTest {
.build();
testee.storeAttachment(attachment).join();
- Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).join();
+ Optional<Attachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional();
assertThat(actual).contains(attachment);
}
@@ -114,7 +114,7 @@ class CassandraAttachmentDAOTest {
testee.deleteAttachment(attachment.getAttachmentId()).join();
- assertThat(testee.getAttachment(attachment.getAttachmentId()).join())
+ assertThat(testee.getAttachment(attachment.getAttachmentId()).blockOptional())
.isEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
index 6e3b705..2bf90c7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2Test.java
@@ -52,7 +52,7 @@ class CassandraAttachmentDAOV2Test {
@Test
void getAttachmentShouldReturnEmptyWhenAbsent() {
- Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).join();
+ Optional<DAOAttachment> attachment = testee.getAttachment(ATTACHMENT_ID).blockOptional();
assertThat(attachment).isEmpty();
}
@@ -66,9 +66,9 @@ class CassandraAttachmentDAOV2Test {
.build();
BlobId blobId = BLOB_ID_FACTORY.from("blobId");
DAOAttachment daoAttachment = CassandraAttachmentDAOV2.from(attachment, blobId);
- testee.storeAttachment(daoAttachment).join();
+ testee.storeAttachment(daoAttachment).block();
- Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).join();
+ Optional<DAOAttachment> actual = testee.getAttachment(ATTACHMENT_ID).blockOptional();
assertThat(actual).contains(daoAttachment);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
index 862ea95..10492c6 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentFallbackTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -100,7 +101,7 @@ class CassandraAttachmentFallbackTest {
.build();
BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
- attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+ attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
attachmentDAO.storeAttachment(otherAttachment).join();
assertThat(attachmentMapper.getAttachment(ATTACHMENT_ID_1))
@@ -135,7 +136,7 @@ class CassandraAttachmentFallbackTest {
.build();
BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
- attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+ attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
attachmentDAO.storeAttachment(otherAttachment).join();
assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1)))
@@ -170,10 +171,11 @@ class CassandraAttachmentFallbackTest {
.build();
BlobId blobId = blobsDAO.save(attachment.getBytes()).join();
- attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).join();
+ attachmentDAOV2.storeAttachment(CassandraAttachmentDAOV2.from(attachment, blobId)).block();
attachmentDAO.storeAttachment(otherAttachment).join();
- assertThat(attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2)))
- .containsExactly(attachment, otherAttachment);
+ List<Attachment> attachments = attachmentMapper.getAttachments(ImmutableList.of(ATTACHMENT_ID_1, ATTACHMENT_ID_2));
+ assertThat(attachments)
+ .containsExactlyInAnyOrder(attachment, otherAttachment);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org