You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/01/28 14:53:25 UTC
[10/12] james-project git commit: JAMES-2630 Migrate
CassandraAsyncExecutor.executeReturnApplied consumers to Reactor
JAMES-2630 Migrate CassandraAsyncExecutor.executeReturnApplied consumers to Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e4a737e5
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e4a737e5
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e4a737e5
Branch: refs/heads/master
Commit: e4a737e52c34c217503dee3e69cc9d0275937046
Parents: 06ee1e4
Author: Gautier DI FOLCO <gd...@linagora.com>
Authored: Tue Jan 8 16:41:38 2019 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Mon Jan 28 15:49:06 2019 +0100
----------------------------------------------------------------------
.../cassandra/utils/CassandraAsyncExecutor.java | 12 +-
.../utils/FunctionRunnerWithRetry.java | 78 --------
.../utils/FunctionRunnerWithRetryTest.java | 173 ------------------
.../eventstore/cassandra/EventStoreDao.java | 2 +-
.../james/mailbox/MailboxManagerStressTest.java | 2 +-
.../cassandra/mail/AttachmentLoader.java | 51 ++----
.../cassandra/mail/CassandraACLMapper.java | 138 ++++++---------
.../cassandra/mail/CassandraAttachmentDAO.java | 8 +-
.../mail/CassandraAttachmentDAOV2.java | 13 +-
.../mail/CassandraAttachmentMapper.java | 87 ++++-----
.../mail/CassandraAttachmentMessageIdDAO.java | 5 +-
.../mail/CassandraAttachmentOwnerDAO.java | 5 +-
.../cassandra/mail/CassandraMailboxDAO.java | 32 ++--
.../cassandra/mail/CassandraMailboxMapper.java | 147 +++++++---------
.../cassandra/mail/CassandraMailboxPathDAO.java | 19 +-
.../mail/CassandraMailboxPathDAOImpl.java | 47 ++---
.../mail/CassandraMailboxPathV2DAO.java | 47 +++--
.../cassandra/mail/CassandraMessageDAO.java | 61 +++----
.../cassandra/mail/CassandraMessageIdDAO.java | 9 +-
.../mail/CassandraMessageIdMapper.java | 176 +++++++++----------
.../mail/CassandraMessageIdToImapUidDAO.java | 21 +--
.../cassandra/mail/CassandraMessageMapper.java | 49 +++---
.../cassandra/mail/CassandraModSeqProvider.java | 98 +++++------
.../cassandra/mail/CassandraUidProvider.java | 81 ++++-----
.../mail/CassandraUserMailboxRightsDAO.java | 42 +++--
.../migration/AttachmentMessageIdCreation.java | 12 +-
.../mail/migration/AttachmentV2Migration.java | 2 +-
.../mail/migration/MailboxPathV2Migration.java | 4 +-
.../mail/task/MailboxMergingTaskRunner.java | 8 +-
.../cassandra/mail/AttachmentLoaderTest.java | 82 ++-------
.../cassandra/mail/CassandraACLMapperTest.java | 24 +--
.../mail/CassandraAttachmentDAOTest.java | 6 +-
.../mail/CassandraAttachmentDAOV2Test.java | 6 +-
.../mail/CassandraAttachmentFallbackTest.java | 12 +-
.../mail/CassandraAttachmentOwnerDAOTest.java | 17 +-
.../cassandra/mail/CassandraMailboxDAOTest.java | 44 +++--
.../mail/CassandraMailboxMapperTest.java | 88 +++++-----
.../mail/CassandraMailboxPathDAOImplTest.java | 12 +-
.../mail/CassandraMailboxPathDAOTest.java | 37 ++--
.../cassandra/mail/CassandraMapperProvider.java | 3 +-
.../cassandra/mail/CassandraMessageDAOTest.java | 6 +-
.../mail/CassandraMessageIdDAOTest.java | 24 +--
.../CassandraMessageIdToImapUidDAOTest.java | 95 +++++-----
.../mail/CassandraModSeqProviderTest.java | 27 ++-
.../mail/CassandraUidProviderTest.java | 26 ++-
.../mail/CassandraUserMailboxRightsDAOTest.java | 12 +-
.../migration/AttachmentV2MigrationTest.java | 14 +-
.../migration/MailboxPathV2MigrationTest.java | 18 +-
.../AbstractMailboxManagerAttachmentTest.java | 23 ++-
server/container/util/pom.xml | 6 +
.../cassandra/CassandraDomainList.java | 4 +-
.../cassandra/CassandraActiveScriptDAO.java | 19 +-
.../sieve/cassandra/CassandraSieveDAO.java | 20 +--
.../sieve/cassandra/CassandraSieveQuotaDAO.java | 5 +-
.../cassandra/CassandraSieveRepository.java | 91 +++++-----
.../cassandra/CassandraUsersRepository.java | 6 +-
.../cassandra/CassandraActiveScriptDAOTest.java | 22 +--
.../sieve/cassandra/CassandraSieveDAOTest.java | 34 ++--
.../cassandra/CassandraSieveQuotaDAOTest.java | 10 +-
.../cassandra/CassandraMailRepository.java | 47 ++---
.../CassandraMailRepositoryCountDAO.java | 9 +-
.../CassandraMailRepositoryKeysDAO.java | 17 +-
.../CassandraMailRepositoryMailDAO.java | 5 +-
.../CassandraMailRepositoryMailDaoAPI.java | 4 +-
.../CassandraMailRepositoryMailDaoV2.java | 6 +-
.../MergingCassandraMailRepositoryMailDao.java | 6 +-
.../CassandraMailRepositoryCountDAOTest.java | 18 +-
.../CassandraMailRepositoryKeysDAOTest.java | 54 +++---
.../CassandraMailRepositoryMailDAOTest.java | 4 +-
...ilRepositoryWithFakeImplementationsTest.java | 29 +--
70 files changed, 1031 insertions(+), 1390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
index 899635e..361bb93 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/CassandraAsyncExecutor.java
@@ -46,11 +46,6 @@ public class CassandraAsyncExecutor {
return executeReactor(statement).toFuture();
}
-
- public CompletableFuture<Boolean> executeReturnApplied(Statement statement) {
- return executeReturnAppliedReactor(statement).toFuture();
- }
-
public CompletableFuture<Void> executeVoid(Statement statement) {
return executeVoidReactor(statement).toFuture();
}
@@ -67,9 +62,8 @@ public class CassandraAsyncExecutor {
}
- public Mono<Boolean> executeReturnAppliedReactor(Statement statement) {
- return executeReactor(statement)
- .map(ResultSet::one)
+ public Mono<Boolean> executeReturnApplied(Statement statement) {
+ return executeSingleRowReactor(statement)
.map(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
}
@@ -83,7 +77,7 @@ public class CassandraAsyncExecutor {
.flatMap(Mono::justOrEmpty);
}
- private Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
+ public Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
return executeReactor(statement)
.map(resultSet -> Optional.ofNullable(resultSet.one()));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
deleted file mode 100644
index 86e1aba..0000000
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetry.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.cassandra.utils;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-
-import com.google.common.base.Preconditions;
-
-public class FunctionRunnerWithRetry {
-
- public static class RelayingException extends RuntimeException {}
-
- @FunctionalInterface
- public interface OptionalSupplier<T> {
- Optional<T> getAsOptional();
- }
-
- private final int maxRetry;
-
- public FunctionRunnerWithRetry(int maxRetry) {
- Preconditions.checkArgument(maxRetry > 0);
- this.maxRetry = maxRetry;
- }
-
- public void execute(BooleanSupplier functionNotifyingSuccess) throws LightweightTransactionException {
- IntStream.range(0, maxRetry)
- .filter((x) -> functionNotifyingSuccess.getAsBoolean())
- .findFirst()
- .orElseThrow(() -> new LightweightTransactionException(maxRetry));
- }
-
- public <T> T executeAndRetrieveObject(OptionalSupplier<T> functionNotifyingSuccess) throws LightweightTransactionException {
- return IntStream.range(0, maxRetry)
- .mapToObj((x) -> functionNotifyingSuccess.getAsOptional())
- .filter(Optional::isPresent)
- .findFirst()
- .orElseThrow(() -> new LightweightTransactionException(maxRetry))
- .get();
- }
-
- public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier) {
- return executeAsyncAndRetrieveObject(futureSupplier, 0);
- }
-
- public <T> CompletableFuture<Optional<T>> executeAsyncAndRetrieveObject(Supplier<CompletableFuture<Optional<T>>> futureSupplier, int tries) {
- if (tries >= maxRetry) {
- return CompletableFuture.completedFuture(Optional.empty());
- }
- return futureSupplier.get()
- .thenCompose(optional -> {
- if (optional.isPresent()) {
- return CompletableFuture.completedFuture(optional);
- }
- return executeAsyncAndRetrieveObject(futureSupplier, tries + 1);
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
deleted file mode 100644
index 7fe5813..0000000
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/utils/FunctionRunnerWithRetryTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.cassandra.utils;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.junit.Test;
-
-public class FunctionRunnerWithRetryTest {
-
- private static final int MAX_RETRY = 10;
-
- @Test(expected = IllegalArgumentException.class)
- public void functionRunnerWithInvalidMaxRetryShouldFail() throws Exception {
- new FunctionRunnerWithRetry(-1);
- }
-
- @Test(expected = LightweightTransactionException.class)
- public void functionRunnerShouldFailIfTransactionCanNotBePerformed() throws Exception {
- final MutableInt value = new MutableInt(0);
- new FunctionRunnerWithRetry(MAX_RETRY).execute(
- () -> {
- value.increment();
- return false;
- }
- );
- assertThat(value.getValue()).isEqualTo(MAX_RETRY);
- }
-
- @Test
- public void functionRunnerShouldWorkOnFirstTry() throws Exception {
- final MutableInt value = new MutableInt(0);
- new FunctionRunnerWithRetry(MAX_RETRY).execute(
- () -> {
- value.increment();
- return true;
- }
- );
- assertThat(value.getValue()).isEqualTo(1);
- }
-
- @Test
- public void functionRunnerShouldWorkIfNotSucceededOnFirstTry() throws Exception {
- final MutableInt value = new MutableInt(0);
- new FunctionRunnerWithRetry(MAX_RETRY).execute(
- () -> {
- value.increment();
- return (Integer) value.getValue() == MAX_RETRY / 2;
- }
- );
- assertThat(value.getValue()).isEqualTo(MAX_RETRY / 2);
- }
-
- @Test
- public void functionRunnerShouldWorkIfNotSucceededOnMaxRetryReached() throws Exception {
- final MutableInt value = new MutableInt(0);
- new FunctionRunnerWithRetry(MAX_RETRY).execute(
- () -> {
- value.increment();
- return (Integer) value.getValue() == MAX_RETRY;
- }
- );
- assertThat(value.getValue()).isEqualTo(MAX_RETRY);
- }
-
- @Test
- public void asyncFunctionRunnerShouldWorkIfSucceedFirstTry() throws Exception {
- int value = 18;
-
- Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
- .executeAsyncAndRetrieveObject(
- () -> CompletableFuture.completedFuture(Optional.of(value)))
- .join();
-
- assertThat(result).contains(value);
- }
-
- @Test
- public void asyncFunctionRunnerShouldTryOnlyOnceIfSuccess() throws Exception {
- int value = 18;
- AtomicInteger times = new AtomicInteger(0);
-
- new FunctionRunnerWithRetry(MAX_RETRY)
- .executeAsyncAndRetrieveObject(
- () -> {
- times.incrementAndGet();
- return CompletableFuture.completedFuture(Optional.of(value));
- })
- .join();
-
- assertThat(times.get()).isEqualTo(1);
- }
-
- @Test
- public void asyncFunctionRunnerShouldRetrieveValueOnRetry() throws Exception {
- int value = 18;
- AtomicInteger times = new AtomicInteger(0);
-
- Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
- .executeAsyncAndRetrieveObject(
- () -> {
- int attemptCount = times.incrementAndGet();
- if (attemptCount == MAX_RETRY) {
- return CompletableFuture.completedFuture(Optional.of(value));
- } else {
- return CompletableFuture.completedFuture(Optional.empty());
- }
- })
- .join();
-
- assertThat(result).contains(value);
- }
-
- @Test
- public void asyncFunctionRunnerShouldMakeMaxRetryAttempts() throws Exception {
- int value = 18;
- AtomicInteger times = new AtomicInteger(0);
-
- new FunctionRunnerWithRetry(MAX_RETRY)
- .executeAsyncAndRetrieveObject(
- () -> {
- int attemptCount = times.incrementAndGet();
- if (attemptCount == MAX_RETRY) {
- return CompletableFuture.completedFuture(Optional.of(value));
- } else {
- return CompletableFuture.completedFuture(Optional.empty());
- }
- })
- .join();
-
- assertThat(times.get()).isEqualTo(MAX_RETRY);
- }
-
-
- @Test
- public void asyncFunctionRunnerShouldReturnEmptyIfAllFailed() throws Exception {
- AtomicInteger times = new AtomicInteger(0);
-
- Optional<Integer> result = new FunctionRunnerWithRetry(MAX_RETRY)
- .executeAsyncAndRetrieveObject(
- () -> {
- times.incrementAndGet();
- return CompletableFuture.completedFuture(Optional.<Integer>empty());
- })
- .join();
-
- assertThat(result).isEmpty();
- assertThat(times.get()).isEqualTo(MAX_RETRY);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
----------------------------------------------------------------------
diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
index e1e55c7..8f6a9d1 100644
--- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
+++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java
@@ -82,7 +82,7 @@ public class EventStoreDao {
public Mono<Boolean> appendAll(List<Event> events) {
BatchStatement batch = new BatchStatement();
events.forEach(event -> batch.add(insertEvent(event)));
- return cassandraAsyncExecutor.executeReturnAppliedReactor(batch);
+ return cassandraAsyncExecutor.executeReturnApplied(batch);
}
private BoundStatement insertEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
index 97e0103..544e1fe 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerStressTest.java
@@ -81,7 +81,7 @@ public abstract class MailboxManagerStressTest<T extends MailboxManager> {
final AtomicBoolean fail = new AtomicBoolean(false);
final ConcurrentHashMap<MessageUid, Object> uids = new ConcurrentHashMap<>();
- // fire of 1000 append operations
+ // fire of append operations
for (int i = 0; i < APPEND_OPERATIONS; i++) {
pool.execute(() -> {
if (fail.get()) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index bf486ed..d57e99a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -19,24 +19,18 @@
package org.apache.james.mailbox.cassandra.mail;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.mailbox.model.Attachment;
-import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.MessageAttachment;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.FluentFutureStream;
-import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class AttachmentLoader {
@@ -46,33 +40,23 @@ public class AttachmentLoader {
this.attachmentMapper = attachmentMapper;
}
- public CompletableFuture<Stream<SimpleMailboxMessage>> addAttachmentToMessages(Stream<Pair<MessageWithoutAttachment,
- Stream<MessageAttachmentRepresentation>>> messageRepresentations, MessageMapper.FetchType fetchType) {
+ public Mono<SimpleMailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType fetchType) {
if (fetchType == MessageMapper.FetchType.Body || fetchType == MessageMapper.FetchType.Full) {
- return FluentFutureStream.<SimpleMailboxMessage>of(
- messageRepresentations
- .map(pair -> getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
- .thenApply(attachments -> pair.getLeft().toMailboxMessage(attachments))))
- .completableFuture();
+ return getAttachments(messageRepresentation.getRight().collect(ImmutableList.toImmutableList()))
+ .map(attachments -> messageRepresentation.getLeft().toMailboxMessage(attachments));
} else {
- return CompletableFuture.completedFuture(messageRepresentations
- .map(pair -> pair.getLeft()
- .toMailboxMessage(ImmutableList.of())));
+ return Mono.just(messageRepresentation.getLeft().toMailboxMessage(ImmutableList.of()));
}
}
@VisibleForTesting
- CompletableFuture<List<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) {
- CompletableFuture<Map<AttachmentId, Attachment>> attachmentsByIdFuture =
- attachmentsById(attachmentRepresentations.stream()
- .map(MessageAttachmentRepresentation::getAttachmentId)
- .collect(Guavate.toImmutableSet()));
-
- return attachmentsByIdFuture.thenApply(attachmentsById ->
- attachmentRepresentations.stream()
- .map(representation -> constructMessageAttachment(attachmentsById.get(representation.getAttachmentId()), representation))
- .collect(Guavate.toImmutableList()));
+ Mono<List<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) {
+ return Flux.fromIterable(attachmentRepresentations)
+ .flatMap(attachmentRepresentation ->
+ attachmentMapper.getAttachmentsAsMono(attachmentRepresentation.getAttachmentId())
+ .map(attachment -> constructMessageAttachment(attachment, attachmentRepresentation)))
+ .collectList();
}
private MessageAttachment constructMessageAttachment(Attachment attachment, MessageAttachmentRepresentation messageAttachmentRepresentation) {
@@ -84,15 +68,4 @@ public class AttachmentLoader {
.build();
}
- @VisibleForTesting
- CompletableFuture<Map<AttachmentId, Attachment>> attachmentsById(Set<AttachmentId> attachmentIds) {
- if (attachmentIds.isEmpty()) {
- return CompletableFuture.completedFuture(ImmutableMap.of());
- }
- return attachmentMapper.getAttachmentsAsFuture(attachmentIds)
- .thenApply(attachments -> attachments
- .stream()
- .collect(Guavate.toImmutableMap(Attachment::getAttachmentId, Function.identity())));
- }
-
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 7f2bb6a..dd6f6cd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -27,16 +27,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
@@ -45,14 +41,15 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.store.json.MailboxACLJsonConverter;
+import org.apache.james.util.FunctionalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.fasterxml.jackson.core.JsonProcessingException;
+import reactor.core.publisher.Mono;
public class CassandraACLMapper {
public static final int INITIAL_VALUE = 0;
@@ -65,7 +62,7 @@ public class CassandraACLMapper {
}
private final CassandraAsyncExecutor executor;
- private final int maxRetry;
+ private final int maxAclRetry;
private final CodeInjector codeInjector;
private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
private final PreparedStatement conditionalInsertStatement;
@@ -79,7 +76,7 @@ public class CassandraACLMapper {
public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) {
this.executor = new CassandraAsyncExecutor(session);
- this.maxRetry = cassandraConfiguration.getAclMaxRetry();
+ this.maxAclRetry = cassandraConfiguration.getAclMaxRetry();
this.codeInjector = codeInjector;
this.conditionalInsertStatement = prepareConditionalInsert(session);
this.conditionalUpdateStatement = prepareConditionalUpdate(session);
@@ -112,103 +109,84 @@ public class CassandraACLMapper {
.where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
}
- public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
+ public Mono<MailboxACL> getACL(CassandraId cassandraId) {
return getStoredACLRow(cassandraId)
- .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+ .map(row -> getAcl(cassandraId, row))
+ .switchIfEmpty(Mono.just(MailboxACL.EMPTY));
}
- private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
- if (resultSet.isExhausted()) {
- return MailboxACL.EMPTY;
- }
- String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+ private MailboxACL getAcl(CassandraId cassandraId, Row row) {
+ String serializedACL = row.getString(CassandraACLTable.ACL);
return deserializeACL(cassandraId, serializedACL);
}
public ACLDiff updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException {
MailboxACL replacement = MailboxACL.EMPTY.apply(command);
-
- ACLDiff aclDiff = updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement);
-
- return userMailboxRightsDAO.update(cassandraId, aclDiff)
- .thenApply(any -> aclDiff)
- .join();
+ return updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement)
+ .flatMap(aclDiff -> userMailboxRightsDAO.update(cassandraId, aclDiff)
+ .thenReturn(aclDiff))
+ .blockOptional()
+ .orElseThrow(() -> new MailboxException("Unable to update ACL"));
}
public ACLDiff setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException {
- ACLDiff aclDiff = updateAcl(cassandraId,
- acl -> new ACLWithVersion(acl.version, mailboxACL),
- mailboxACL);
-
- return userMailboxRightsDAO.update(cassandraId, aclDiff)
- .thenApply(any -> aclDiff)
- .join();
- }
-
- private ACLDiff updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
- try {
- return new FunctionRunnerWithRetry(maxRetry)
- .executeAndRetrieveObject(
- () -> {
- codeInjector.inject();
- return getAclWithVersion(cassandraId)
- .map(aclWithVersion ->
- updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion))
- .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL)))
- .orElseGet(() -> insertACL(cassandraId, replacement)
- .map(newACL -> ACLDiff.computeDiff(MailboxACL.EMPTY, newACL)));
- });
- } catch (LightweightTransactionException e) {
- throw new MailboxException("Exception during lightweight transaction", e);
- }
- }
-
- private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
- return executor.execute(
+ return updateAcl(cassandraId, acl -> new ACLWithVersion(acl.version, mailboxACL), mailboxACL)
+ .flatMap(aclDiff -> userMailboxRightsDAO.update(cassandraId, aclDiff)
+ .thenReturn(aclDiff))
+ .blockOptional()
+ .orElseThrow(() -> new MailboxException("Unable to update ACL"));
+ }
+
+ private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
+ return Mono.fromRunnable(() -> codeInjector.inject())
+ .then(Mono.defer(() -> getAclWithVersion(cassandraId)))
+ .flatMap(aclWithVersion ->
+ updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion))
+ .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL)))
+ .switchIfEmpty(insertACL(cassandraId, replacement)
+ .map(newACL -> ACLDiff.computeDiff(MailboxACL.EMPTY, newACL)))
+ .single()
+ .retry(maxAclRetry);
+ }
+
+ private Mono<Row> getStoredACLRow(CassandraId cassandraId) {
+ return executor.executeSingleRowReactor(
readStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
}
- private Optional<MailboxACL> updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
- try {
- return executor.executeReturnApplied(
- conditionalUpdateStatement.bind()
+ private Mono<MailboxACL> updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
+ return executor.executeReturnApplied(
+ conditionalUpdateStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, convertAclToJson(aclWithVersion.mailboxACL))
+ .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
+ .setLong(OLD_VERSION, aclWithVersion.version))
+ .filter(FunctionalUtils.toPredicate(Function.identity()))
+ .map(any -> aclWithVersion.mailboxACL);
+ }
+
+ private Mono<MailboxACL> insertACL(CassandraId cassandraId, MailboxACL acl) {
+ return Mono.defer(() -> executor.executeReturnApplied(
+ conditionalInsertStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid())
- .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
- .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
- .setLong(OLD_VERSION, aclWithVersion.version))
- .thenApply(Optional::of)
- .thenApply(optional -> optional.filter(b -> b).map(any -> aclWithVersion.mailboxACL))
- .join();
- } catch (JsonProcessingException exception) {
- throw new RuntimeException(exception);
- }
+ .setString(CassandraACLTable.ACL, convertAclToJson(acl))))
+ .filter(FunctionalUtils.toPredicate(Function.identity()))
+ .map(any -> acl);
}
- private Optional<MailboxACL> insertACL(CassandraId cassandraId, MailboxACL acl) {
+ private String convertAclToJson(MailboxACL acl) {
try {
- return executor.executeReturnApplied(
- conditionalInsertStatement.bind()
- .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
- .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl)))
- .thenApply(Optional::of)
- .thenApply(optional -> optional.filter(b -> b).map(any -> acl))
- .join();
+ return MailboxACLJsonConverter.toJson(acl);
} catch (JsonProcessingException exception) {
throw new RuntimeException(exception);
}
}
- private Optional<ACLWithVersion> getAclWithVersion(CassandraId cassandraId) {
- ResultSet resultSet = getStoredACLRow(cassandraId).join();
- if (resultSet.isExhausted()) {
- return Optional.empty();
- }
- Row row = resultSet.one();
- return Optional.of(
- new ACLWithVersion(
- row.getLong(CassandraACLTable.VERSION),
- deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
+ private Mono<ACLWithVersion> getAclWithVersion(CassandraId cassandraId) {
+ return getStoredACLRow(cassandraId)
+ .map(acl -> new ACLWithVersion(acl.getLong(CassandraACLTable.VERSION),
+ deserializeACL(cassandraId, acl.getString(CassandraACLTable.ACL))));
}
private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
index 2e5c767..fb00ec0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAO.java
@@ -33,7 +33,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentTable.
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
@@ -49,6 +48,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
public class CassandraAttachmentDAO {
@@ -99,12 +99,12 @@ public class CassandraAttachmentDAO {
.from(TABLE_NAME));
}
- public CompletableFuture<Optional<Attachment>> getAttachment(AttachmentId attachmentId) {
+ public Mono<Attachment> getAttachment(AttachmentId attachmentId) {
Preconditions.checkArgument(attachmentId != null);
- return cassandraAsyncExecutor.executeSingleRow(
+ return cassandraAsyncExecutor.executeSingleRowReactor(
selectStatement.bind()
.setString(ID, attachmentId.getId()))
- .thenApply(optional -> optional.map(this::attachment));
+ .map(this::attachment);
}
public Stream<Attachment> retrieveAll() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
index f88723f..38b6e21 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentDAOV2.java
@@ -32,8 +32,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentV2Tabl
import static org.apache.james.mailbox.cassandra.table.CassandraAttachmentV2Table.TYPE;
import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -46,6 +44,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
public class CassandraAttachmentDAOV2 {
public static class DAOAttachment {
@@ -150,16 +149,16 @@ public class CassandraAttachmentDAOV2 {
.where(eq(ID_AS_UUID, bindMarker(ID_AS_UUID))));
}
- public CompletableFuture<Optional<DAOAttachment>> getAttachment(AttachmentId attachmentId) {
+ public Mono<DAOAttachment> getAttachment(AttachmentId attachmentId) {
Preconditions.checkArgument(attachmentId != null);
- return cassandraAsyncExecutor.executeSingleRow(
+ return cassandraAsyncExecutor.executeSingleRowReactor(
selectStatement.bind()
.setUUID(ID_AS_UUID, attachmentId.asUUID()))
- .thenApply(rowOptional -> rowOptional.map(row -> CassandraAttachmentDAOV2.fromRow(row, blobIdFactory)));
+ .map(row -> CassandraAttachmentDAOV2.fromRow(row, blobIdFactory));
}
- public CompletableFuture<Void> storeAttachment(DAOAttachment attachment) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> storeAttachment(DAOAttachment attachment) {
+ return cassandraAsyncExecutor.executeVoidReactor(
insertStatement.bind()
.setUUID(ID_AS_UUID, attachment.getAttachmentId().asUUID())
.setString(ID, attachment.getAttachmentId().getId())
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index ecb43de..a7d9dfc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -21,10 +21,6 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.Collection;
import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.stream.Stream;
import javax.inject.Inject;
@@ -37,13 +33,14 @@ import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.AttachmentMapper;
import org.apache.james.mailbox.store.mail.model.Username;
-import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraAttachmentMapper implements AttachmentMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAttachmentMapper.class);
@@ -76,65 +73,54 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
public Attachment getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException {
Preconditions.checkArgument(attachmentId != null);
return getAttachmentInternal(attachmentId)
- .join()
+ .blockOptional()
.orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId()));
}
- private CompletionStage<Optional<Attachment>> retrievePayload(Optional<DAOAttachment> daoAttachmentOptional) {
- if (!daoAttachmentOptional.isPresent()) {
- return CompletableFuture.completedFuture(Optional.empty());
- }
- DAOAttachment daoAttachment = daoAttachmentOptional.get();
- return blobStore.readBytes(daoAttachment.getBlobId())
- .thenApply(bytes -> Optional.of(daoAttachment.toAttachment(bytes)));
+ private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) {
+ return Mono.fromCompletionStage(blobStore.readBytes(daoAttachment.getBlobId())
+ .thenApply(daoAttachment::toAttachment));
}
@Override
public List<Attachment> getAttachments(Collection<AttachmentId> attachmentIds) {
- return getAttachmentsAsFuture(attachmentIds).join();
- }
-
- public CompletableFuture<ImmutableList<Attachment>> getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
Preconditions.checkArgument(attachmentIds != null);
+ return Flux.fromIterable(attachmentIds)
+ .flatMap(this::getAttachmentsAsMono)
+ .collectList()
+ .block();
+ }
- Stream<CompletableFuture<Optional<Attachment>>> attachments = attachmentIds
- .stream()
- .distinct()
- .map(id -> getAttachmentInternal(id)
- .thenApply(finalValue -> logNotFound(id, finalValue)));
-
- return FluentFutureStream.of(attachments, FluentFutureStream::unboxOptional)
- .collect(Guavate.toImmutableList());
+ public Mono<Attachment> getAttachmentsAsMono(AttachmentId attachmentId) {
+ return getAttachmentInternal(attachmentId)
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logNotFound((attachmentId))));
}
- private CompletableFuture<Optional<Attachment>> getAttachmentInternal(AttachmentId id) {
+ private Mono<Attachment> getAttachmentInternal(AttachmentId id) {
return attachmentDAOV2.getAttachment(id)
- .thenCompose(this::retrievePayload)
- .thenCompose(v2Value -> fallbackToV1(id, v2Value));
+ .flatMap(this::retrievePayload)
+ .switchIfEmpty(fallbackToV1(id));
}
- private CompletionStage<Optional<Attachment>> fallbackToV1(AttachmentId attachmentId, Optional<Attachment> v2Value) {
- if (v2Value.isPresent()) {
- return CompletableFuture.completedFuture(v2Value);
- }
+ private Mono<Attachment> fallbackToV1(AttachmentId attachmentId) {
return attachmentDAO.getAttachment(attachmentId);
}
@Override
public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
ownerDAO.addOwner(attachment.getAttachmentId(), owner)
- .thenCompose(any -> blobStore.save(attachment.getBytes()))
- .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
- .thenCompose(attachmentDAOV2::storeAttachment)
- .join();
+ .then(Mono.fromFuture(blobStore.save(attachment.getBytes())))
+ .map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
+ .flatMap(attachmentDAOV2::storeAttachment)
+ .block();
}
@Override
public void storeAttachmentsForMessage(Collection<Attachment> attachments, MessageId ownerMessageId) throws MailboxException {
- FluentFutureStream.of(
- attachments.stream()
- .map(attachment -> storeAttachmentAsync(attachment, ownerMessageId)))
- .join();
+ Flux.fromIterable(attachments)
+ .flatMap(attachment -> storeAttachmentAsync(attachment, ownerMessageId))
+ .then()
+ .block();
}
@Override
@@ -148,21 +134,18 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
return ownerDAO.retrieveOwners(attachmentId).join().collect(Guavate.toImmutableList());
}
- public CompletableFuture<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) {
- return blobStore.save(attachment.getBytes())
- .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
- .thenCompose(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId));
+ public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) {
+ return Mono.fromFuture(blobStore.save(attachment.getBytes())
+ .thenApply(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId)))
+ .flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId));
}
- private CompletableFuture<Void> storeAttachmentWithIndex(DAOAttachment daoAttachment, MessageId ownerMessageId) {
+ private Mono<Void> storeAttachmentWithIndex(DAOAttachment daoAttachment, MessageId ownerMessageId) {
return attachmentDAOV2.storeAttachment(daoAttachment)
- .thenCompose(any -> attachmentMessageIdDAO.storeAttachmentForMessageId(daoAttachment.getAttachmentId(), ownerMessageId));
+ .then(attachmentMessageIdDAO.storeAttachmentForMessageId(daoAttachment.getAttachmentId(), ownerMessageId));
}
- private Optional<Attachment> logNotFound(AttachmentId attachmentId, Optional<Attachment> optionalAttachment) {
- if (!optionalAttachment.isPresent()) {
- LOGGER.warn("Failed retrieving attachment {}", attachmentId);
- }
- return optionalAttachment;
+ private void logNotFound(AttachmentId attachmentId) {
+ LOGGER.warn("Failed retrieving attachment {}", attachmentId);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
index 88b3d4d..3f02c1a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMessageIdDAO.java
@@ -44,6 +44,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
public class CassandraAttachmentMessageIdDAO {
@@ -91,8 +92,8 @@ public class CassandraAttachmentMessageIdDAO {
return messageIdFactory.fromString(row.getString(MESSAGE_ID));
}
- public CompletableFuture<Void> storeAttachmentForMessageId(AttachmentId attachmentId, MessageId ownerMessageId) {
- return cassandraAsyncExecutor.executeVoid(
+ public Mono<Void> storeAttachmentForMessageId(AttachmentId attachmentId, MessageId ownerMessageId) {
+ return cassandraAsyncExecutor.executeVoidReactor(
insertStatement.bind()
.setUUID(ATTACHMENT_ID_AS_UUID, attachmentId.asUUID())
.setString(ATTACHMENT_ID, attachmentId.getId())
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
index e92757c..fce332d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAO.java
@@ -41,6 +41,7 @@ import org.apache.james.mailbox.store.mail.model.Username;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
public class CassandraAttachmentOwnerDAO {
@@ -72,8 +73,8 @@ public class CassandraAttachmentOwnerDAO {
.where(eq(ID, bindMarker(ID))));
}
- public CompletableFuture<Void> addOwner(AttachmentId attachmentId, Username owner) {
- return executor.executeVoid(
+ public Mono<Void> addOwner(AttachmentId attachmentId, Username owner) {
+ return executor.executeVoidReactor(
addStatement.bind()
.setUUID(ID, attachmentId.asUUID())
.setString(OWNER, owner.getValue()));
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 96075a4..7e75099 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -32,7 +32,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.NAM
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxTable.UIDVALIDITY;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
@@ -46,13 +45,14 @@ import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
-import org.apache.james.util.FluentFutureStream;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailboxDAO {
@@ -112,9 +112,9 @@ public class CassandraMailboxDAO {
.where(eq(ID, bindMarker(ID))));
}
- public CompletableFuture<Void> save(Mailbox mailbox) {
+ public Mono<Void> save(Mailbox mailbox) {
CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
- return executor.executeVoid(insertStatement.bind()
+ return executor.executeVoidReactor(insertStatement.bind()
.setUUID(ID, cassandraId.asUuid())
.setString(NAME, mailbox.getName())
.setLong(UIDVALIDITY, mailbox.getUidValidity())
@@ -128,21 +128,21 @@ public class CassandraMailboxDAO {
.setUDTValue(MAILBOX_BASE, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser())));
}
- public CompletableFuture<Void> delete(CassandraId mailboxId) {
- return executor.executeVoid(deleteStatement.bind()
+ public Mono<Void> delete(CassandraId mailboxId) {
+ return executor.executeVoidReactor(deleteStatement.bind()
.setUUID(ID, mailboxId.asUuid()));
}
- public CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) {
- return executor.executeSingleRow(readStatement.bind()
+ public Mono<SimpleMailbox> retrieveMailbox(CassandraId mailboxId) {
+ return executor.executeSingleRowReactor(readStatement.bind()
.setUUID(ID, mailboxId.asUuid()))
- .thenApply(rowOptional -> rowOptional.map(this::mailboxFromRow))
- .thenApply(mailbox -> addMailboxId(mailboxId, mailbox));
+ .map(this::mailboxFromRow)
+ .map(mailbox -> addMailboxId(mailboxId, mailbox));
}
- private Optional<SimpleMailbox> addMailboxId(CassandraId cassandraId, Optional<SimpleMailbox> mailboxOptional) {
- mailboxOptional.ifPresent(mailbox -> mailbox.setMailboxId(cassandraId));
- return mailboxOptional;
+ private SimpleMailbox addMailboxId(CassandraId cassandraId, SimpleMailbox mailbox) {
+ mailbox.setMailboxId(cassandraId);
+ return mailbox;
}
private SimpleMailbox mailboxFromRow(Row row) {
@@ -154,9 +154,9 @@ public class CassandraMailboxDAO {
row.getLong(UIDVALIDITY));
}
- public FluentFutureStream<SimpleMailbox> retrieveAllMailboxes() {
- return FluentFutureStream.of(executor.execute(listStatement.bind())
- .thenApply(cassandraUtils::convertToStream))
+ public Flux<SimpleMailbox> retrieveAllMailboxes() {
+ return executor.executeReactor(listStatement.bind())
+ .flatMapMany(cassandraUtils::convertToFlux)
.map(this::toMailboxWithId);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index 4d3b7fd..058d6fe 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -21,18 +21,14 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.StringTokenizer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import javax.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.exception.MailboxException;
@@ -45,20 +41,19 @@ import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalUtils;
+import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailboxMapper implements MailboxMapper {
- public static final String WILDCARD = "%";
+ private static final String WILDCARD = "%";
public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMailboxMapper.class);
private final CassandraMailboxDAO mailboxDAO;
@@ -79,77 +74,70 @@ public class CassandraMailboxMapper implements MailboxMapper {
@Override
public void delete(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- FluentFutureStream.ofFutures(
+ Flux.merge(
mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()))
- .map(any -> mailboxDAO.delete(mailboxId), FluentFutureStream::unboxFuture)
- .join();
+ .thenEmpty(mailboxDAO.delete(mailboxId))
+ .block();
}
@Override
public Mailbox findMailboxByPath(MailboxPath path) throws MailboxException {
return mailboxPathV2DAO.retrieveId(path)
- .thenCompose(cassandraIdOptional ->
- cassandraIdOptional
- .map(CassandraIdAndPath::getCassandraId)
- .map(this::retrieveMailbox)
- .orElseGet(Throwing.supplier(() -> fromPreviousTable(path))))
- .join()
+ .map(CassandraIdAndPath::getCassandraId)
+ .flatMap(this::retrieveMailbox)
+ .switchIfEmpty(fromPreviousTable(path))
+ .blockOptional()
.orElseThrow(() -> new MailboxNotFoundException(path));
}
- private CompletableFuture<Optional<SimpleMailbox>> fromPreviousTable(MailboxPath path) {
+ private Mono<SimpleMailbox> fromPreviousTable(MailboxPath path) {
return mailboxPathDAO.retrieveId(path)
- .thenCompose(cassandraIdOptional ->
- cassandraIdOptional
- .map(CassandraIdAndPath::getCassandraId)
- .map(this::retrieveMailbox)
- .orElse(CompletableFuture.completedFuture(Optional.empty())))
- .thenCompose(maybeMailbox -> maybeMailbox.map(this::migrate)
- .orElse(CompletableFuture.completedFuture(maybeMailbox)));
+ .map(CassandraIdAndPath::getCassandraId)
+ .flatMap(this::retrieveMailbox)
+ .flatMap(this::migrate);
}
- private CompletableFuture<Optional<SimpleMailbox>> migrate(SimpleMailbox mailbox) {
+ private Mono<SimpleMailbox> migrate(SimpleMailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return mailboxPathV2DAO.save(mailbox.generateAssociatedPath(), mailboxId)
- .thenCompose(success -> deleteIfSuccess(mailbox, success))
- .thenApply(any -> Optional.of(mailbox));
+ .flatMap(success -> deleteIfSuccess(mailbox, success))
+ .thenReturn(mailbox);
}
- private CompletionStage<Void> deleteIfSuccess(SimpleMailbox mailbox, boolean success) {
+ private Mono<Void> deleteIfSuccess(SimpleMailbox mailbox, boolean success) {
if (success) {
return mailboxPathDAO.delete(mailbox.generateAssociatedPath());
}
LOGGER.info("Concurrent execution lead to data race while migrating {} to 'mailboxPathV2DAO'.",
mailbox.generateAssociatedPath());
- return CompletableFuture.completedFuture(null);
+ return Mono.empty();
}
@Override
public Mailbox findMailboxById(MailboxId id) throws MailboxException {
CassandraId mailboxId = (CassandraId) id;
return retrieveMailbox(mailboxId)
- .join()
+ .blockOptional()
.orElseThrow(() -> new MailboxNotFoundException(id));
}
- private CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraId mailboxId) {
- CompletableFuture<MailboxACL> aclCompletableFuture = cassandraACLMapper.getACL(mailboxId);
+ private Mono<SimpleMailbox> retrieveMailbox(CassandraId mailboxId) {
+ Mono<MailboxACL> aclCompletableFuture = cassandraACLMapper.getACL(mailboxId);
+ Mono<SimpleMailbox> simpleMailboxFuture = mailboxDAO.retrieveMailbox(mailboxId);
- CompletableFuture<Optional<SimpleMailbox>> simpleMailboxFuture = mailboxDAO.retrieveMailbox(mailboxId);
-
- return aclCompletableFuture.thenCombine(simpleMailboxFuture, this::addAcl);
+ return aclCompletableFuture.zipWith(simpleMailboxFuture, this::addAcl);
}
- private Optional<SimpleMailbox> addAcl(MailboxACL acl, Optional<SimpleMailbox> mailboxOptional) {
- mailboxOptional.ifPresent(mailbox -> mailbox.setACL(acl));
- return mailboxOptional;
+ private SimpleMailbox addAcl(MailboxACL acl, SimpleMailbox mailbox) {
+ mailbox.setACL(acl);
+ return mailbox;
}
@Override
public List<Mailbox> findMailboxWithPathLike(MailboxPath path) {
- List<Mailbox> mailboxesV2 = toMailboxes(path, mailboxPathV2DAO.listUserMailboxes(path.getNamespace(), path.getUser()));
- List<Mailbox> mailboxesV1 = toMailboxes(path, mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()));
+ List<SimpleMailbox> mailboxesV2 = toMailboxes(path, mailboxPathV2DAO.listUserMailboxes(path.getNamespace(), path.getUser()));
+ List<SimpleMailbox> mailboxesV1 = toMailboxes(path, mailboxPathDAO.listUserMailboxes(path.getNamespace(), path.getUser()));
List<Mailbox> mailboxesV1NotInV2 = mailboxesV1.stream()
.filter(mailboxV1 -> mailboxesV2.stream()
@@ -163,19 +151,19 @@ public class CassandraMailboxMapper implements MailboxMapper {
.build();
}
- private List<Mailbox> toMailboxes(MailboxPath path, CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes) {
+ private List<SimpleMailbox> toMailboxes(MailboxPath path, Flux<CassandraIdAndPath> listUserMailboxes) {
Pattern regex = Pattern.compile(constructEscapedRegexForMailboxNameMatching(path));
- return FluentFutureStream.of(listUserMailboxes)
+ return listUserMailboxes
.filter(idAndPath -> regex.matcher(idAndPath.getMailboxPath().getName()).matches())
- .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional)
- .join()
- .collect(Guavate.toImmutableList());
+ .flatMap(this::retrieveMailbox)
+ .collectList()
+ .block();
}
- private CompletableFuture<Optional<SimpleMailbox>> retrieveMailbox(CassandraIdAndPath idAndPath) {
+ private Mono<SimpleMailbox> retrieveMailbox(CassandraIdAndPath idAndPath) {
return retrieveMailbox(idAndPath.getCassandraId())
- .thenApply(optional -> OptionalUtils.executeIfEmpty(optional,
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(
() -> LOGGER.warn("Could not retrieve mailbox {} with path {} in mailbox table.", idAndPath.getCassandraId(), idAndPath.getMailboxPath())));
}
@@ -186,22 +174,20 @@ public class CassandraMailboxMapper implements MailboxMapper {
CassandraId cassandraId = retrieveId(cassandraMailbox);
cassandraMailbox.setMailboxId(cassandraId);
- boolean applied = trySave(cassandraMailbox, cassandraId).join();
- if (!applied) {
+ if (!trySave(cassandraMailbox, cassandraId)) {
throw new MailboxExistsException(mailbox.generateAssociatedPath().asString());
}
return cassandraId;
}
- private CompletableFuture<Boolean> trySave(SimpleMailbox cassandraMailbox, CassandraId cassandraId) {
- return mailboxPathV2DAO.save(cassandraMailbox.generateAssociatedPath(), cassandraId)
- .thenCompose(CompletableFutureUtil.composeIfTrue(
- () -> retrieveMailbox(cassandraId)
- .thenCompose(optional -> CompletableFuture
- .allOf(optional
- .map(storedMailbox -> mailboxPathV2DAO.delete(storedMailbox.generateAssociatedPath()))
- .orElse(CompletableFuture.completedFuture(null)),
- mailboxDAO.save(cassandraMailbox)))));
+ private boolean trySave(SimpleMailbox cassandraMailbox, CassandraId cassandraId) {
+ boolean isCreated = mailboxPathV2DAO.save(cassandraMailbox.generateAssociatedPath(), cassandraId).block();
+ if (isCreated) {
+ Optional<SimpleMailbox> simpleMailbox = retrieveMailbox(cassandraId).blockOptional();
+ simpleMailbox.ifPresent(mbx -> mailboxPathV2DAO.delete(mbx.generateAssociatedPath()).block());
+ mailboxDAO.save(cassandraMailbox).block();
+ }
+ return isCreated;
}
private CassandraId retrieveId(SimpleMailbox cassandraMailbox) {
@@ -214,21 +200,21 @@ public class CassandraMailboxMapper implements MailboxMapper {
@Override
public boolean hasChildren(Mailbox mailbox, char delimiter) {
- return ImmutableList.of(
+ return Flux.merge(
mailboxPathDAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser()),
mailboxPathV2DAO.listUserMailboxes(mailbox.getNamespace(), mailbox.getUser()))
- .stream()
- .map(CompletableFuture::join)
- .flatMap(Function.identity())
- .anyMatch(idAndPath -> idAndPath.getMailboxPath().getName().startsWith(mailbox.getName() + String.valueOf(delimiter)));
+ .filter(idAndPath -> idAndPath.getMailboxPath().getName().startsWith(mailbox.getName() + String.valueOf(delimiter)))
+ .hasElements()
+ .block();
}
@Override
public List<Mailbox> list() {
return mailboxDAO.retrieveAllMailboxes()
- .map(this::toMailboxWithAclFuture, FluentFutureStream::unboxFuture)
- .join()
- .collect(Guavate.toImmutableList());
+ .flatMap(this::toMailboxWithAcl)
+ .map(simpleMailboxes -> (Mailbox) simpleMailboxes)
+ .collectList()
+ .block();
}
@Override
@@ -269,10 +255,10 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
}
- private CompletableFuture<SimpleMailbox> toMailboxWithAclFuture(SimpleMailbox mailbox) {
+ private Mono<SimpleMailbox> toMailboxWithAcl(SimpleMailbox mailbox) {
CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
return cassandraACLMapper.getACL(cassandraId)
- .thenApply(acl -> {
+ .map(acl -> {
mailbox.setACL(acl);
return mailbox;
});
@@ -280,18 +266,17 @@ public class CassandraMailboxMapper implements MailboxMapper {
@Override
public List<Mailbox> findNonPersonalMailboxes(String userName, Right right) {
- return FluentFutureStream.of(userMailboxRightsDAO.listRightsForUser(userName)
- .thenApply(map -> toAuthorizedMailboxIds(map, right)))
- .map(this::retrieveMailbox, FluentFutureStream::unboxFutureOptional)
- .join()
- .collect(Guavate.toImmutableList());
+ return userMailboxRightsDAO.listRightsForUser(userName)
+ .filter(mailboxId -> authorizedMailbox(mailboxId.getRight(), right))
+ .map(Pair::getLeft)
+ .flatMap(this::retrieveMailbox)
+ .map(simpleMailboxes -> (Mailbox) simpleMailboxes)
+ .collectList()
+ .block();
}
- private Stream<CassandraId> toAuthorizedMailboxIds(Map<CassandraId, MailboxACL.Rfc4314Rights> map, Right right) {
- return map.entrySet()
- .stream()
- .filter(Throwing.predicate(entry -> entry.getValue().contains(right)))
- .map(Map.Entry::getKey);
+ private boolean authorizedMailbox(MailboxACL.Rfc4314Rights rights, Right right) {
+ return rights.contains(right);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
index 33fac3c..1eee863 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAO.java
@@ -19,23 +19,24 @@
package org.apache.james.mailbox.cassandra.mail;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.MailboxPath;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public interface CassandraMailboxPathDAO {
- CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath);
+ Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath);
+
+ Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user);
- CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user);
+ void logGhostMailboxSuccess(CassandraIdAndPath value);
- Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value);
+ void logGhostMailboxFailure(MailboxPath mailboxPath);
- CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId);
+ Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId);
- CompletableFuture<Void> delete(MailboxPath mailboxPath);
+ Mono<Void> delete(MailboxPath mailboxPath);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
index 41f5dd0..46975a0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathDAOImpl.java
@@ -31,7 +31,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable.NAMESPACE_AND_USER;
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathTable.TABLE_NAME;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
@@ -45,12 +44,16 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.mail.utils.MailboxBaseTupleUtil;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
@@ -80,7 +83,7 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
}
@VisibleForTesting
- public CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider typesProvider) {
+ CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider typesProvider) {
this(session, typesProvider, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
}
@@ -122,24 +125,24 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
.from(TABLE_NAME));
}
- public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) {
- return cassandraAsyncExecutor.executeSingleRow(
+ public Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
select.bind()
.setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
.setString(MAILBOX_NAME, mailboxPath.getName()))
- .thenApply(rowOptional ->
- rowOptional.map(this::fromRowToCassandraIdAndPath))
- .thenApply(value -> logGhostMailbox(mailboxPath, value));
+ .map(this::fromRowToCassandraIdAndPath)
+ .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess))
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
}
@Override
- public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) {
- return cassandraAsyncExecutor.execute(
+ public Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user) {
+ return cassandraAsyncExecutor.executeReactor(
selectAllForUser.bind()
.setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user)))
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+ .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
.map(this::fromRowToCassandraIdAndPath)
- .peek(this::logReadSuccess));
+ .map(FunctionalUtils.toFunction(this::logReadSuccess)));
}
/**
@@ -149,19 +152,19 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
* reads and write operations are also added in order to allow audit in order to know if the mailbox existed.
*/
@Override
- public Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value) {
- if (value.isPresent()) {
- CassandraIdAndPath cassandraIdAndPath = value.get();
- logReadSuccess(cassandraIdAndPath);
- } else {
- GhostMailbox.logger()
+ public void logGhostMailboxSuccess(CassandraIdAndPath value) {
+ logReadSuccess(value);
+ }
+
+ @Override
+ public void logGhostMailboxFailure(MailboxPath mailboxPath) {
+ GhostMailbox.logger()
.addField(GhostMailbox.MAILBOX_NAME, mailboxPath)
.addField(TYPE, "readMiss")
.log(logger -> logger.info("Read mailbox missed"));
- }
- return value;
}
+
/**
* See https://issues.apache.org/jira/browse/MAILBOX-322 to read about the Ghost mailbox bug.
*
@@ -185,7 +188,7 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
}
@Override
- public CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
+ public Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
return cassandraAsyncExecutor.executeReturnApplied(insert.bind()
.setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
.setString(MAILBOX_NAME, mailboxPath.getName())
@@ -193,8 +196,8 @@ public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {
}
@Override
- public CompletableFuture<Void> delete(MailboxPath mailboxPath) {
- return cassandraAsyncExecutor.executeVoid(delete.bind()
+ public Mono<Void> delete(MailboxPath mailboxPath) {
+ return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
.setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(mailboxPath.getNamespace(), mailboxPath.getUser()))
.setString(MAILBOX_NAME, mailboxPath.getName()));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/e4a737e5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
index 1f58898..2247c66 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV2DAO.java
@@ -31,10 +31,6 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Tab
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV2Table.USER;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -42,11 +38,15 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.mailbox.cassandra.GhostMailbox;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
@@ -101,26 +101,26 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
}
@Override
- public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) {
- return cassandraAsyncExecutor.executeSingleRow(
+ public Mono<CassandraIdAndPath> retrieveId(MailboxPath mailboxPath) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
select.bind()
.setString(NAMESPACE, mailboxPath.getNamespace())
.setString(USER, sanitizeUser(mailboxPath.getUser()))
.setString(MAILBOX_NAME, mailboxPath.getName()))
- .thenApply(rowOptional ->
- rowOptional.map(this::fromRowToCassandraIdAndPath))
- .thenApply(value -> logGhostMailbox(mailboxPath, value));
+ .map(this::fromRowToCassandraIdAndPath)
+ .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess))
+ .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
}
@Override
- public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) {
- return cassandraAsyncExecutor.execute(
+ public Flux<CassandraIdAndPath> listUserMailboxes(String namespace, String user) {
+ return cassandraAsyncExecutor.executeReactor(
selectAll.bind()
.setString(NAMESPACE, namespace)
.setString(USER, sanitizeUser(user)))
- .thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
+ .flatMapMany(resultSet -> cassandraUtils.convertToFlux(resultSet)
.map(this::fromRowToCassandraIdAndPath)
- .peek(this::logReadSuccess));
+ .map(FunctionalUtils.toFunction(this::logReadSuccess)));
}
/**
@@ -130,17 +130,16 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
* reads and write operations are also added in order to allow audit in order to know if the mailbox existed.
*/
@Override
- public Optional<CassandraIdAndPath> logGhostMailbox(MailboxPath mailboxPath, Optional<CassandraIdAndPath> value) {
- if (value.isPresent()) {
- CassandraIdAndPath cassandraIdAndPath = value.get();
- logReadSuccess(cassandraIdAndPath);
- } else {
- GhostMailbox.logger()
+ public void logGhostMailboxSuccess(CassandraIdAndPath value) {
+ logReadSuccess(value);
+ }
+
+ @Override
+ public void logGhostMailboxFailure(MailboxPath mailboxPath) {
+ GhostMailbox.logger()
.addField(GhostMailbox.MAILBOX_NAME, mailboxPath)
.addField(TYPE, "readMiss")
.log(logger -> logger.info("Read mailbox missed"));
- }
- return value;
}
/**
@@ -166,7 +165,7 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
}
@Override
- public CompletableFuture<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
+ public Mono<Boolean> save(MailboxPath mailboxPath, CassandraId mailboxId) {
return cassandraAsyncExecutor.executeReturnApplied(insert.bind()
.setString(NAMESPACE, mailboxPath.getNamespace())
.setString(USER, sanitizeUser(mailboxPath.getUser()))
@@ -175,8 +174,8 @@ public class CassandraMailboxPathV2DAO implements CassandraMailboxPathDAO {
}
@Override
- public CompletableFuture<Void> delete(MailboxPath mailboxPath) {
- return cassandraAsyncExecutor.executeVoid(delete.bind()
+ public Mono<Void> delete(MailboxPath mailboxPath) {
+ return cassandraAsyncExecutor.executeVoidReactor(delete.bind()
.setString(NAMESPACE, mailboxPath.getNamespace())
.setString(USER, sanitizeUser(mailboxPath.getUser()))
.setString(MAILBOX_NAME, mailboxPath.getName()));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org