You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/03/04 08:03:55 UTC
[james-project] 09/09: JAMES-3065 exclude Reactor Mono/Flux from
API projects
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0c49996fe01084c570b231ab68dcf660cf36c85a
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Feb 11 16:44:03 2020 +0100
JAMES-3065 exclude Reactor Mono/Flux from API projects
---
.../src/main/java/org/apache/james/util/Host.java | 0
.../src/main/java/org/apache/james/util/Port.java | 0
.../test/java/org/apache/james/util/HostTest.java | 0
.../test/java/org/apache/james/util/PortTest.java | 0
mailbox/api/pom.xml | 8 +-
.../james/mailbox/model/MailboxAnnotationKey.java | 3 +-
.../java/org/apache/james/mailbox/model/Quota.java | 6 ++
.../james/mailbox/quota/MaxQuotaManager.java | 41 ++++++---
.../cassandra/mail/CassandraAttachmentMapper.java | 6 +-
.../cassandra/mail/CassandraMessageDAO.java | 6 +-
.../mail/migration/AttachmentV2Migration.java | 2 +-
.../cassandra/mail/CassandraMailboxMapperTest.java | 5 +-
mailbox/event/event-memory/pom.xml | 4 +
.../vault/blob/BlobStoreDeletedMessageVault.java | 15 +--
.../james/mailbox/store/PreDeletionHooks.java | 5 +-
metrics/metrics-api/pom.xml | 8 +-
.../apache/james/metrics/api/MetricFactory.java | 13 +--
metrics/metrics-dropwizard/pom.xml | 4 +
.../dropwizard/DropWizardMetricFactory.java | 9 ++
metrics/metrics-logger/pom.xml | 4 +
.../james/metrics/logger/DefaultMetricFactory.java | 9 ++
.../metrics/tests/RecordingMetricFactory.java | 9 ++
pom.xml | 5 +
protocols/api/pom.xml | 8 --
server/blob/blob-api/pom.xml | 9 ++
.../org/apache/james/blob/api/BlobPartsId.java | 4 +-
.../java/org/apache/james/blob/api/BlobStore.java | 14 +--
.../blob/api/{BlobPartsId.java => BlobType.java} | 29 +++++-
.../org/apache/james/blob/api/DumbBlobStore.java | 20 ++--
.../apache/james/blob/api/MetricableBlobStore.java | 29 +++---
.../apache/james/blob/api/BlobStoreContract.java | 46 +++++-----
.../org/apache/james/blob/api/BlobTypeTest.java} | 20 ++--
.../james/blob/api/BucketBlobStoreContract.java | 46 +++++-----
.../blob/api/BucketDumbBlobStoreContract.java | 46 +++++-----
.../james/blob/api/DeleteBlobStoreContract.java | 48 +++++-----
.../blob/api/DeleteDumbBlobStoreContract.java | 52 +++++------
.../blob/api/MetricableBlobStoreContract.java | 36 ++++----
.../blob/api/ReadSaveDumbBlobStoreContract.java | 101 ++++++++++-----------
.../blob/cassandra/CassandraBlobStoreTest.java | 12 +--
.../blob/blob-common}/pom.xml | 19 ++--
.../main/java/org/apache/james/blob/api/Store.java | 32 +------
.../file/LocalFileBlobExportMechanismTest.java | 11 ++-
server/blob/blob-memory/pom.xml | 9 ++
.../ObjectStorageBlobStoreContract.java | 4 +-
.../objectstorage/ObjectStorageBlobStoreTest.java | 22 ++---
.../apache/james/blob/union/HybridBlobStore.java | 12 +--
server/blob/mail-store/pom.xml | 8 ++
.../apache/james/blob/mail/MimeMessagePartsId.java | 10 +-
.../apache/james/blob/mail/MimeMessageStore.java | 2 +-
.../james/blob/mail/MimeMessageStoreTest.java | 6 +-
server/blob/pom.xml | 1 +
server/container/jetty/pom.xml | 4 +
server/container/lifecycle-api/pom.xml | 8 --
.../container/metrics/metrics-es-reporter/pom.xml | 4 +-
server/data/data-api/pom.xml | 4 -
.../org/apache/james/jmap/draft/model/Keyword.java | 11 +--
server/protocols/webadmin/webadmin-core/pom.xml | 5 +
.../james/webadmin/vault/routes/ExportService.java | 2 +-
server/queue/queue-api/pom.xml | 5 +
server/queue/queue-jms/pom.xml | 4 +
server/task/task-api/pom.xml | 15 ++-
server/task/task-json/pom.xml | 4 +
.../linshare/LinshareBlobExportMechanismTest.java | 10 +-
63 files changed, 490 insertions(+), 404 deletions(-)
diff --git a/server/container/util/src/main/java/org/apache/james/util/Host.java b/core/src/main/java/org/apache/james/util/Host.java
similarity index 100%
rename from server/container/util/src/main/java/org/apache/james/util/Host.java
rename to core/src/main/java/org/apache/james/util/Host.java
diff --git a/server/container/util/src/main/java/org/apache/james/util/Port.java b/core/src/main/java/org/apache/james/util/Port.java
similarity index 100%
rename from server/container/util/src/main/java/org/apache/james/util/Port.java
rename to core/src/main/java/org/apache/james/util/Port.java
diff --git a/server/container/util/src/test/java/org/apache/james/util/HostTest.java b/core/src/test/java/org/apache/james/util/HostTest.java
similarity index 100%
rename from server/container/util/src/test/java/org/apache/james/util/HostTest.java
rename to core/src/test/java/org/apache/james/util/HostTest.java
diff --git a/server/container/util/src/test/java/org/apache/james/util/PortTest.java b/core/src/test/java/org/apache/james/util/PortTest.java
similarity index 100%
rename from server/container/util/src/test/java/org/apache/james/util/PortTest.java
rename to core/src/test/java/org/apache/james/util/PortTest.java
diff --git a/mailbox/api/pom.xml b/mailbox/api/pom.xml
index 118cc9a..1440ed3 100644
--- a/mailbox/api/pom.xml
+++ b/mailbox/api/pom.xml
@@ -47,10 +47,6 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-util</artifactId>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
<artifactId>apache-mime4j-dom</artifactId>
</dependency>
<dependency>
@@ -89,8 +85,8 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxAnnotationKey.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxAnnotationKey.java
index a4ac543..810308b 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxAnnotationKey.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxAnnotationKey.java
@@ -22,7 +22,6 @@ package org.apache.james.mailbox.model;
import java.util.Locale;
import org.apache.commons.lang3.StringUtils;
-import org.apache.james.util.UnicodeSetUtils;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
@@ -33,7 +32,7 @@ public class MailboxAnnotationKey {
public static final String SLASH_CHARACTER = "/";
public static final String TWO_SLASH_CHARACTER = "//";
- private static final UnicodeSet NAME_ANNOTATION_PATTERN = UnicodeSetUtils.letterOrDigitUnicodeSet()
+ private static final UnicodeSet NAME_ANNOTATION_PATTERN = new UnicodeSet("[[a-z][A-Z][0-9]]")
.add(SLASH_CHARACTER)
.freeze();
public static final int MINIMUM_COMPONENTS = 2;
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Quota.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Quota.java
index 0cbfe2e..565f12e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/Quota.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/Quota.java
@@ -18,6 +18,8 @@
****************************************************************/
package org.apache.james.mailbox.model;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.james.core.quota.QuotaLimitValue;
@@ -35,6 +37,10 @@ public class Quota<T extends QuotaLimitValue<T>, U extends QuotaUsageValue<U, T>
User
}
+ public static List<Scope> allScopes() {
+ return Arrays.asList(Quota.Scope.User, Quota.Scope.Domain, Quota.Scope.Global);
+ }
+
public static <T extends QuotaLimitValue<T>, U extends QuotaUsageValue<U, T>> Builder<T, U> builder() {
return new Builder<>();
}
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/MaxQuotaManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/MaxQuotaManager.java
index 76afb3f..1030620 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/MaxQuotaManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/MaxQuotaManager.java
@@ -20,7 +20,10 @@
package org.apache.james.mailbox.quota;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
import org.apache.james.core.Domain;
import org.apache.james.core.quota.QuotaCountLimit;
@@ -29,7 +32,6 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.Quota;
import org.apache.james.mailbox.model.Quota.Scope;
import org.apache.james.mailbox.model.QuotaRoot;
-import org.apache.james.util.OptionalUtils;
import com.github.fge.lambdas.Throwing;
@@ -121,10 +123,11 @@ public interface MaxQuotaManager {
}
default Optional<QuotaSizeLimit> getMaxStorage(Map<Quota.Scope, QuotaSizeLimit> maxStorageDetails) {
- return OptionalUtils.or(
- Optional.ofNullable(maxStorageDetails.get(Quota.Scope.User)),
- Optional.ofNullable(maxStorageDetails.get(Quota.Scope.Domain)),
- Optional.ofNullable(maxStorageDetails.get(Quota.Scope.Global)));
+ return Quota.allScopes()
+ .stream()
+ .map(maxStorageDetails::get)
+ .filter(Objects::nonNull)
+ .findFirst();
}
/**
@@ -139,10 +142,10 @@ public interface MaxQuotaManager {
}
default Optional<QuotaCountLimit> getMaxMessage(Map<Quota.Scope, QuotaCountLimit> maxMessagesDetails) {
- return OptionalUtils.or(
- Optional.ofNullable(maxMessagesDetails.get(Quota.Scope.User)),
- Optional.ofNullable(maxMessagesDetails.get(Quota.Scope.Domain)),
- Optional.ofNullable(maxMessagesDetails.get(Quota.Scope.Global)));
+ return Stream.of(Quota.Scope.User, Quota.Scope.Domain, Quota.Scope.Global)
+ .map(maxMessagesDetails::get)
+ .filter(Objects::nonNull)
+ .findFirst();
}
Map<Quota.Scope, QuotaCountLimit> listMaxMessagesDetails(QuotaRoot quotaRoot);
@@ -162,14 +165,22 @@ public interface MaxQuotaManager {
void removeDomainMaxStorage(Domain domain) throws MailboxException;
default Optional<QuotaCountLimit> getComputedMaxMessage(Domain domain) throws MailboxException {
- return OptionalUtils.orSuppliers(
- Throwing.supplier(() -> getDomainMaxMessage(domain)).sneakyThrow(),
- Throwing.supplier(this::getGlobalMaxMessage).sneakyThrow());
+ return Stream.of(
+ Throwing.supplier(() -> getDomainMaxMessage(domain)).sneakyThrow(),
+ Throwing.supplier(this::getGlobalMaxMessage).sneakyThrow())
+ .map(Supplier::get)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .findFirst();
}
default Optional<QuotaSizeLimit> getComputedMaxStorage(Domain domain) throws MailboxException {
- return OptionalUtils.orSuppliers(
- Throwing.supplier(() -> getDomainMaxStorage(domain)).sneakyThrow(),
- Throwing.supplier(this::getGlobalMaxStorage).sneakyThrow());
+ return Stream.of(
+ Throwing.supplier(() -> getDomainMaxStorage(domain)).sneakyThrow(),
+ Throwing.supplier(this::getGlobalMaxStorage).sneakyThrow())
+ .map(Supplier::get)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .findFirst();
}
}
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index 867a002..adf1c3f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -81,7 +81,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
}
private Mono<Attachment> retrievePayload(DAOAttachment daoAttachment) {
- return blobStore.readBytes(blobStore.getDefaultBucketName(), daoAttachment.getBlobId())
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), daoAttachment.getBlobId()))
.map(daoAttachment::toAttachment);
}
@@ -112,7 +112,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
@Override
public void storeAttachmentForOwner(Attachment attachment, Username owner) throws MailboxException {
ownerDAO.addOwner(attachment.getAttachmentId(), owner)
- .then(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST))
+ .then(Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)))
.map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
.flatMap(attachmentDAOV2::storeAttachment)
.block();
@@ -139,7 +139,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
}
public Mono<Void> storeAttachmentAsync(Attachment attachment, MessageId ownerMessageId) {
- return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)
+ return Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST))
.map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
.flatMap(daoAttachment -> storeAttachmentWithIndex(daoAttachment, ownerMessageId));
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 16f81fd..0bfce81 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -184,8 +184,8 @@ public class CassandraMessageDAO {
byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent());
byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent());
- Mono<BlobId> bodyFuture = blobStore.save(blobStore.getDefaultBucketName(), bodyContent, LOW_COST);
- Mono<BlobId> headerFuture = blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED);
+ Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyContent, LOW_COST));
+ Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
return headerFuture.zipWith(bodyFuture);
} catch (IOException e) {
@@ -365,7 +365,7 @@ public class CassandraMessageDAO {
}
private Mono<byte[]> getFieldContent(String field, Row row) {
- return blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field)));
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field))));
}
public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
index 25474f5..020a8d5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2Migration.java
@@ -57,7 +57,7 @@ public class AttachmentV2Migration implements Migration {
}
private Mono<Void> migrateAttachment(Attachment attachment) {
- return blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST)
+ return Mono.from(blobStore.save(blobStore.getDefaultBucketName(), attachment.getBytes(), LOW_COST))
.map(blobId -> CassandraAttachmentDAOV2.from(attachment, blobId))
.flatMap(attachmentDAOV2::storeAttachment)
.then(attachmentDAOV1.deleteAttachment(attachment.getAttachmentId()));
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 986fe8b..2a044ec 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -23,6 +23,8 @@ import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.mailbox.model.MailboxAssertingTool.softly;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import java.util.List;
@@ -57,6 +59,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.runnable.ThrowingRunnable;
+import reactor.core.publisher.Mono;
class CassandraMailboxMapperTest {
private static final UidValidity UID_VALIDITY = UidValidity.ofValid(52);
@@ -446,7 +449,7 @@ class CassandraMailboxMapperTest {
.isNotEqualTo(testee.findMailboxById(inboxId).getName());
}
- @Disabled("JAMES-3057 org.apache.james.mailbox.exception.MailboxNotFoundException: INBOX can not be found")
+ @Disabled("JAMES-3056 org.apache.james.mailbox.exception.MailboxNotFoundException: 'mailboxId' can not be found")
@Test
void createAfterPreviousFailedCreateShouldCreateAMailbox(CassandraCluster cassandra) throws MailboxException {
cassandra.getConf()
diff --git a/mailbox/event/event-memory/pom.xml b/mailbox/event/event-memory/pom.xml
index 72d32d0..943a479 100644
--- a/mailbox/event/event-memory/pom.xml
+++ b/mailbox/event/event-memory/pom.xml
@@ -44,6 +44,10 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>james-server-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>metrics-api</artifactId>
</dependency>
<dependency>
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
index e545e53..a7bf391 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java
@@ -98,7 +98,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
}
private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) {
- return blobStore.save(bucketName, mimeMessage, LOW_COST)
+ return Mono.from(blobStore.save(bucketName, mimeMessage, LOW_COST))
.map(blobId -> StorageInformation.builder()
.bucketName(bucketName)
.blobId(blobId))
@@ -156,7 +156,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
return Mono.from(messageMetadataVault.retrieveStorageInformation(username, messageId))
.flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), username, messageId))
.thenReturn(storageInformation))
- .flatMap(storageInformation -> blobStore.delete(storageInformation.getBucketName(), storageInformation.getBlobId()))
+ .flatMap(storageInformation -> Mono.from(blobStore.delete(storageInformation.getBucketName(), storageInformation.getBlobId())))
.subscribeOn(Schedulers.elastic());
}
@@ -167,10 +167,11 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
Flux<BucketName> deleteExpiredMessages(ZonedDateTime beginningOfRetentionPeriod) {
- return metricFactory.runPublishingTimerMetric(
- DELETE_EXPIRED_MESSAGES_METRIC_NAME,
- retentionQualifiedBuckets(beginningOfRetentionPeriod)
- .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName))));
+ return Flux.from(
+ metricFactory.runPublishingTimerMetric(
+ DELETE_EXPIRED_MESSAGES_METRIC_NAME,
+ retentionQualifiedBuckets(beginningOfRetentionPeriod)
+ .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName)))));
}
@@ -196,7 +197,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault {
}
private Mono<Void> deleteBucketData(BucketName bucketName) {
- return blobStore.deleteBucket(bucketName)
+ return Mono.from(blobStore.deleteBucket(bucketName))
.then(Mono.from(messageMetadataVault.removeMetadataRelatedToBucket(bucketName)));
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
index 46f6e7a..b53e039 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
@@ -62,8 +62,7 @@ public class PreDeletionHooks {
}
private Mono<Void> publishMetric(PreDeletionHook.DeleteOperation deleteOperation, PreDeletionHook hook, MetricFactory factory) {
- return factory.runPublishingTimerMetric(
- PRE_DELETION_HOOK_METRIC_NAME,
- Mono.from(hook.notifyDelete(deleteOperation)));
+ return Mono.from(
+ factory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME, hook.notifyDelete(deleteOperation)));
}
}
diff --git a/metrics/metrics-api/pom.xml b/metrics/metrics-api/pom.xml
index 3212965..8e066b1 100644
--- a/metrics/metrics-api/pom.xml
+++ b/metrics/metrics-api/pom.xml
@@ -36,13 +36,13 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
index 670c637..a689b44 100644
--- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
+++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
@@ -21,8 +21,7 @@ package org.apache.james.metrics.api;
import java.util.function.Supplier;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
public interface MetricFactory {
@@ -39,15 +38,7 @@ public interface MetricFactory {
}
}
- default <T> Mono<T> runPublishingTimerMetric(String name, Mono<T> mono) {
- TimeMetric timer = timer(name);
- return mono.doOnSuccess(success -> timer.stopAndPublish());
- }
-
- default <T> Flux<T> runPublishingTimerMetric(String name, Flux<T> flux) {
- TimeMetric timer = timer(name);
- return flux.doOnComplete(timer::stopAndPublish);
- }
+ <T> Publisher<T> runPublishingTimerMetric(String name, Publisher<T> publisher);
default void runPublishingTimerMetric(String name, Runnable runnable) {
runPublishingTimerMetric(name, () -> {
diff --git a/metrics/metrics-dropwizard/pom.xml b/metrics/metrics-dropwizard/pom.xml
index d0be914..624d2fe 100644
--- a/metrics/metrics-dropwizard/pom.xml
+++ b/metrics/metrics-dropwizard/pom.xml
@@ -62,6 +62,10 @@
<artifactId>metrics-jvm</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
index 2d040d4..8defd20 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
@@ -27,10 +27,13 @@ import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
+import reactor.core.publisher.Flux;
+
public class DropWizardMetricFactory implements MetricFactory, Startable {
private final MetricRegistry metricRegistry;
@@ -53,6 +56,12 @@ public class DropWizardMetricFactory implements MetricFactory, Startable {
return new DropWizardTimeMetric(name, metricRegistry.timer(name).time());
}
+ @Override
+ public <T> Publisher<T> runPublishingTimerMetric(String name, Publisher<T> publisher) {
+ TimeMetric timer = timer(name);
+ return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ }
+
@PostConstruct
public void start() {
jmxReporter.start();
diff --git a/metrics/metrics-logger/pom.xml b/metrics/metrics-logger/pom.xml
index c843ac1..4857038 100644
--- a/metrics/metrics-logger/pom.xml
+++ b/metrics/metrics-logger/pom.xml
@@ -50,6 +50,10 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
index db64916..c603c71 100644
--- a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
+++ b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
@@ -21,9 +21,12 @@ package org.apache.james.metrics.logger;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
public class DefaultMetricFactory implements MetricFactory {
public static final Logger LOGGER = LoggerFactory.getLogger(DefaultMetricFactory.class);
@@ -38,4 +41,10 @@ public class DefaultMetricFactory implements MetricFactory {
return new DefaultTimeMetric(name);
}
+ @Override
+ public <T> Publisher<T> runPublishingTimerMetric(String name, Publisher<T> publisher) {
+ TimeMetric timer = timer(name);
+ return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ }
+
}
diff --git a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
index 24438d5..6009e54 100644
--- a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
+++ b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
@@ -28,12 +28,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
+import reactor.core.publisher.Flux;
+
public class RecordingMetricFactory implements MetricFactory {
private final Multimap<String, Duration> executionTimes = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private final ConcurrentHashMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
@@ -56,6 +59,12 @@ public class RecordingMetricFactory implements MetricFactory {
});
}
+ @Override
+ public <T> Publisher<T> runPublishingTimerMetric(String name, Publisher<T> publisher) {
+ TimeMetric timer = timer(name);
+ return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ }
+
public Collection<Duration> executionTimesFor(String name) {
synchronized (executionTimes) {
return executionTimes.get(name);
diff --git a/pom.xml b/pom.xml
index 1018e28..7bee095 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1110,6 +1110,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>blob-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>blob-export-api</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/protocols/api/pom.xml b/protocols/api/pom.xml
index e1b8b49..9ac0dce 100644
--- a/protocols/api/pom.xml
+++ b/protocols/api/pom.xml
@@ -56,14 +56,6 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
diff --git a/server/blob/blob-api/pom.xml b/server/blob/blob-api/pom.xml
index f4ebecc..cbe5c0e 100644
--- a/server/blob/blob-api/pom.xml
+++ b/server/blob/blob-api/pom.xml
@@ -36,6 +36,7 @@
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-util</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
@@ -61,6 +62,10 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
@@ -69,5 +74,9 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.reactivestreams</groupId>
+ <artifactId>reactive-streams</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java
index a6a9412..96867c6 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java
@@ -23,8 +23,8 @@ import java.util.Map;
public interface BlobPartsId {
interface Factory<I extends BlobPartsId> {
- I generate(Map<Store.BlobType, BlobId> map);
+ I generate(Map<BlobType, BlobId> map);
}
- Map<Store.BlobType, BlobId> asMap();
+ Map<BlobType, BlobId> asMap();
}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 458b354..a887a32 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -21,7 +21,7 @@ package org.apache.james.blob.api;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
public interface BlobStore {
@@ -31,21 +31,21 @@ public interface BlobStore {
HIGH_PERFORMANCE
}
- Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
+ Publisher<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy);
- Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
+ Publisher<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy);
- default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
+ default Publisher<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) {
return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy);
}
- Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+ Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId);
InputStream read(BucketName bucketName, BlobId blobId);
BucketName getDefaultBucketName();
- Mono<Void> deleteBucket(BucketName bucketName);
+ Publisher<Void> deleteBucket(BucketName bucketName);
- Mono<Void> delete(BucketName bucketName, BlobId blobId);
+ Publisher<Void> delete(BucketName bucketName, BlobId blobId);
}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
similarity index 69%
copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java
copy to server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
index a6a9412..e068c68 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobPartsId.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java
@@ -19,12 +19,31 @@
package org.apache.james.blob.api;
-import java.util.Map;
+import java.util.Objects;
-public interface BlobPartsId {
- interface Factory<I extends BlobPartsId> {
- I generate(Map<Store.BlobType, BlobId> map);
+public class BlobType {
+ private final String name;
+
+ public BlobType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
}
- Map<Store.BlobType, BlobId> asMap();
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof BlobType) {
+ BlobType blobType = (BlobType) o;
+
+ return Objects.equals(this.name, blobType.name);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(name);
+ }
}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
index 9f993c5..f2045fc 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
@@ -22,9 +22,9 @@ package org.apache.james.blob.api;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import com.google.common.io.ByteSource;
+import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
+import com.google.common.io.ByteSource;
public interface DumbBlobStore {
@@ -44,7 +44,7 @@ public interface DumbBlobStore {
* an ObjectNotFoundException in its error channel when the blobId or the bucket is not found
* or an IOObjectStoreException when an unexpected IO error occurs
*/
- Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId);
+ Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId);
/**
@@ -55,26 +55,26 @@ public interface DumbBlobStore {
* @return an empty Mono when the save succeed,
* otherwise an IOObjectStoreException in its error channel
*/
- Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data);
+ Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data);
/**
* @see #save(BucketName, BlobId, byte[])
*
* The InputStream should be closed after the call to this method
*/
- Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream);
+ Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream);
/**
* @see #save(BucketName, BlobId, byte[])
*/
- Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content);
+ Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content);
/**
* @see #save(BucketName, BlobId, byte[])
*
* The String is stored as UTF-8.
*/
- default Mono<Void> save(BucketName bucketName, BlobId blobId, String data) {
+ default Publisher<Void> save(BucketName bucketName, BlobId blobId, String data) {
return save(bucketName, blobId, data.getBytes(StandardCharsets.UTF_8));
}
@@ -86,7 +86,7 @@ public interface DumbBlobStore {
* (either the blob doesn't exist in the bucket or the bucket itself doesn't exist)
* otherwise an IOObjectStoreException in its error channel
*/
- Mono<Void> delete(BucketName bucketName, BlobId blobId);
+ Publisher<Void> delete(BucketName bucketName, BlobId blobId);
/**
* Remove a bucket based on its BucketName
@@ -95,8 +95,8 @@ public interface DumbBlobStore {
* Saving or reading blobs concurrently of bucket deletion can lead
* to an inconsistent state.
*
- * @return a successful Mono if the bucket is deleted or did not exist
+ * @return a successful Publisher if the bucket is deleted or did not exist
* otherwise an IOObjectStoreException in its error channel
*/
- Mono<Void> deleteBucket(BucketName bucketName);
+ Publisher<Void> deleteBucket(BucketName bucketName);
}
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index a081cec..0e4e383 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -24,8 +24,7 @@ import javax.inject.Inject;
import javax.inject.Named;
import org.apache.james.metrics.api.MetricFactory;
-
-import reactor.core.publisher.Mono;
+import org.reactivestreams.Publisher;
public class MetricableBlobStore implements BlobStore {
@@ -50,21 +49,18 @@ public class MetricableBlobStore implements BlobStore {
}
@Override
- public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) {
- return metricFactory
- .runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
+ public Publisher<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) {
+ return metricFactory.runPublishingTimerMetric(SAVE_BYTES_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
}
@Override
- public Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) {
- return metricFactory
- .runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
+ public Publisher<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy) {
+ return metricFactory.runPublishingTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, blobStoreImpl.save(bucketName, data, storagePolicy));
}
@Override
- public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
- return metricFactory
- .runPublishingTimerMetric(READ_BYTES_TIMER_NAME, blobStoreImpl.readBytes(bucketName, blobId));
+ public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+ return metricFactory.runPublishingTimerMetric(READ_BYTES_TIMER_NAME, blobStoreImpl.readBytes(bucketName, blobId));
}
@Override
@@ -74,9 +70,8 @@ public class MetricableBlobStore implements BlobStore {
}
@Override
- public Mono<Void> deleteBucket(BucketName bucketName) {
- return metricFactory
- .runPublishingTimerMetric(DELETE_BUCKET_TIMER_NAME, blobStoreImpl.deleteBucket(bucketName));
+ public Publisher<Void> deleteBucket(BucketName bucketName) {
+ return metricFactory.runPublishingTimerMetric(DELETE_BUCKET_TIMER_NAME, blobStoreImpl.deleteBucket(bucketName));
}
@Override
@@ -85,8 +80,8 @@ public class MetricableBlobStore implements BlobStore {
}
@Override
- public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
- return metricFactory
- .runPublishingTimerMetric(DELETE_TIMER_NAME, blobStoreImpl.delete(bucketName, blobId));
+ public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+ return metricFactory.runPublishingTimerMetric(DELETE_TIMER_NAME, blobStoreImpl.delete(bucketName, blobId));
}
+
}
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
index 0fd6961..c01e480 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java
@@ -37,6 +37,8 @@ import org.junit.jupiter.params.provider.MethodSource;
import com.google.common.base.Strings;
+import reactor.core.publisher.Mono;
+
public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobStoreContract {
static Stream<Arguments> storagePolicies() {
@@ -62,7 +64,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- assertThatThrownBy(() -> store.save(defaultBucketName, (byte[]) null, storagePolicy).block())
+ assertThatThrownBy(() -> Mono.from(store.save(defaultBucketName, (byte[]) null, storagePolicy)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -72,7 +74,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- assertThatThrownBy(() -> store.save(defaultBucketName, (String) null, storagePolicy).block())
+ assertThatThrownBy(() -> Mono.from(store.save(defaultBucketName, (String) null, storagePolicy)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -82,7 +84,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- assertThatThrownBy(() -> store.save(defaultBucketName, (InputStream) null, storagePolicy).block())
+ assertThatThrownBy(() -> Mono.from(store.save(defaultBucketName, (InputStream) null, storagePolicy)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -92,9 +94,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, EMPTY_BYTEARRAY, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, EMPTY_BYTEARRAY, storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
}
@@ -105,9 +107,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, new String(), storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, new String(), storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
}
@@ -118,9 +120,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(EMPTY_BYTEARRAY), storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, new ByteArrayInputStream(EMPTY_BYTEARRAY), storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
}
@@ -131,7 +133,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy)).block();
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
}
@@ -142,7 +144,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_STRING, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_STRING, storagePolicy)).block();
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
}
@@ -153,7 +155,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY), storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, new ByteArrayInputStream(SHORT_BYTEARRAY), storagePolicy)).block();
assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66"));
}
@@ -163,7 +165,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- assertThatThrownBy(() -> store.readBytes(defaultBucketName, blobIdFactory().from("unknown")).block())
+ assertThatThrownBy(() -> Mono.from(store.readBytes(defaultBucketName, blobIdFactory().from("unknown"))).block())
.isExactlyInstanceOf(ObjectNotFoundException.class);
}
@@ -173,9 +175,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
}
@@ -186,9 +188,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, ELEVEN_KILOBYTES, storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
}
@@ -199,9 +201,9 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, TWELVE_MEGABYTES, storagePolicy)).block();
- byte[] bytes = store.readBytes(defaultBucketName, blobId).block();
+ byte[] bytes = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
}
@@ -221,7 +223,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, storagePolicy)).block();
InputStream read = store.read(defaultBucketName, blobId);
@@ -234,7 +236,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, ELEVEN_KILOBYTES, storagePolicy)).block();
InputStream read = store.read(defaultBucketName, blobId);
@@ -248,7 +250,7 @@ public interface BlobStoreContract extends DeleteBlobStoreContract, BucketBlobSt
BucketName defaultBucketName = store.getDefaultBucketName();
// 12 MB of text
- BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, storagePolicy).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, TWELVE_MEGABYTES, storagePolicy)).block();
InputStream read = store.read(defaultBucketName, blobId);
diff --git a/server/container/util/src/main/java/org/apache/james/util/UnicodeSetUtils.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobTypeTest.java
similarity index 75%
rename from server/container/util/src/main/java/org/apache/james/util/UnicodeSetUtils.java
rename to server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobTypeTest.java
index 26b05a0..2af6748 100644
--- a/server/container/util/src/main/java/org/apache/james/util/UnicodeSetUtils.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobTypeTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
- * http://www.apache.org/licenses/LICENSE-2.0 *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
@@ -15,16 +15,18 @@
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
- ****************************************************************/
-package org.apache.james.util;
+ ***************************************************************/
-import com.ibm.icu.text.UnicodeSet;
+package org.apache.james.blob.api;
-public class UnicodeSetUtils {
+import org.junit.jupiter.api.Test;
- private static final String LETTER_OR_DIGIT_PATTERN = "[[a-z][A-Z][0-9]]";
+import nl.jqno.equalsverifier.EqualsVerifier;
- public static UnicodeSet letterOrDigitUnicodeSet() {
- return new UnicodeSet(LETTER_OR_DIGIT_PATTERN);
+class BlobTypeTest {
+
+ @Test
+ void shouldRespectBeanContract() {
+ EqualsVerifier.forClass(BlobType.class).verify();
}
-}
+}
\ No newline at end of file
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java
index 0d993bd..75b0706 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketBlobStoreContract.java
@@ -31,6 +31,8 @@ import java.time.Duration;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
public interface BucketBlobStoreContract {
String SHORT_STRING = "toto";
byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
@@ -44,7 +46,7 @@ public interface BucketBlobStoreContract {
default void deleteBucketShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- assertThatThrownBy(() -> store.deleteBucket(null).block())
+ assertThatThrownBy(() -> Mono.from(store.deleteBucket(null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -52,8 +54,8 @@ public interface BucketBlobStoreContract {
default void deleteBucketShouldDeleteExistingBucketWithItsData() {
BlobStore store = testee();
- BlobId blobId = store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block();
- store.deleteBucket(CUSTOM).block();
+ BlobId blobId = Mono.from(store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST)).block();
+ Mono.from(store.deleteBucket(CUSTOM)).block();
assertThatThrownBy(() -> store.read(CUSTOM, blobId).read())
.isInstanceOf(ObjectStoreException.class);
@@ -63,10 +65,10 @@ public interface BucketBlobStoreContract {
default void deleteBucketShouldBeIdempotent() {
BlobStore store = testee();
- store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block();
- store.deleteBucket(CUSTOM).block();
+ Mono.from(store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST)).block();
+ Mono.from(store.deleteBucket(CUSTOM)).block();
- assertThatCode(() -> store.deleteBucket(CUSTOM).block())
+ assertThatCode(() -> Mono.from(store.deleteBucket(CUSTOM)).block())
.doesNotThrowAnyException();
}
@@ -74,7 +76,7 @@ public interface BucketBlobStoreContract {
default void saveBytesShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, SHORT_BYTEARRAY, LOW_COST).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, SHORT_BYTEARRAY, LOW_COST)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -82,7 +84,7 @@ public interface BucketBlobStoreContract {
default void saveStringShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, SHORT_STRING, LOW_COST).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, SHORT_STRING, LOW_COST)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -90,7 +92,7 @@ public interface BucketBlobStoreContract {
default void saveInputStreamShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, new ByteArrayInputStream(SHORT_BYTEARRAY), LOW_COST).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, new ByteArrayInputStream(SHORT_BYTEARRAY), LOW_COST)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -98,7 +100,7 @@ public interface BucketBlobStoreContract {
default void readShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST)).block();
assertThatThrownBy(() -> store.read(null, blobId))
.isInstanceOf(NullPointerException.class);
}
@@ -107,8 +109,8 @@ public interface BucketBlobStoreContract {
default void readBytesStreamShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block();
- assertThatThrownBy(() -> store.readBytes(null, blobId).block())
+ BlobId blobId = Mono.from(store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST)).block();
+ assertThatThrownBy(() -> Mono.from(store.readBytes(null, blobId)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -116,7 +118,7 @@ public interface BucketBlobStoreContract {
default void readStringShouldThrowWhenBucketDoesNotExist() {
BlobStore store = testee();
- BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST)).block();
assertThatThrownBy(() -> store.read(CUSTOM, blobId).read())
.isInstanceOf(ObjectStoreException.class);
}
@@ -125,8 +127,8 @@ public interface BucketBlobStoreContract {
default void readBytesStreamShouldThrowWhenBucketDoesNotExist() {
BlobStore store = testee();
- BlobId blobId = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block();
- assertThatThrownBy(() -> store.readBytes(CUSTOM, blobId).block())
+ BlobId blobId = Mono.from(store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST)).block();
+ assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM, blobId)).block())
.isInstanceOf(ObjectStoreException.class);
}
@@ -134,11 +136,11 @@ public interface BucketBlobStoreContract {
default void shouldBeAbleToSaveDataInMultipleBuckets() {
BlobStore store = testee();
- BlobId blobIdDefault = store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST).block();
- BlobId blobIdCustom = store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block();
+ BlobId blobIdDefault = Mono.from(store.save(BucketName.DEFAULT, SHORT_BYTEARRAY, LOW_COST)).block();
+ BlobId blobIdCustom = Mono.from(store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST)).block();
- byte[] bytesDefault = store.readBytes(BucketName.DEFAULT, blobIdDefault).block();
- byte[] bytesCustom = store.readBytes(CUSTOM, blobIdCustom).block();
+ byte[] bytesDefault = Mono.from(store.readBytes(BucketName.DEFAULT, blobIdDefault)).block();
+ byte[] bytesCustom = Mono.from(store.readBytes(CUSTOM, blobIdCustom)).block();
assertThat(bytesDefault).isEqualTo(bytesCustom);
}
@@ -148,7 +150,7 @@ public interface BucketBlobStoreContract {
BlobStore store = testee();
ConcurrentTestRunner.builder()
- .operation(((threadNumber, step) -> store.save(CUSTOM, SHORT_STRING + threadNumber + step, LOW_COST).block()))
+ .operation(((threadNumber, step) -> Mono.from(store.save(CUSTOM, SHORT_STRING + threadNumber + step, LOW_COST)).block()))
.threadCount(10)
.operationCount(10)
.runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -158,10 +160,10 @@ public interface BucketBlobStoreContract {
default void deleteBucketConcurrentlyShouldNotFail() throws Exception {
BlobStore store = testee();
- store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block();
+ Mono.from(store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST)).block();
ConcurrentTestRunner.builder()
- .operation(((threadNumber, step) -> store.deleteBucket(CUSTOM).block()))
+ .operation(((threadNumber, step) -> Mono.from(store.deleteBucket(CUSTOM)).block()))
.threadCount(10)
.operationCount(10)
.runSuccessfullyWithin(Duration.ofMinutes(1));
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index 49e2a7e..d560587 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -35,6 +35,8 @@ import java.time.Duration;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
public interface BucketDumbBlobStoreContract {
DumbBlobStore testee();
@@ -43,7 +45,7 @@ public interface BucketDumbBlobStoreContract {
default void deleteBucketShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.deleteBucket(null).block())
+ assertThatThrownBy(() -> Mono.from(store.deleteBucket(null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -51,8 +53,8 @@ public interface BucketDumbBlobStoreContract {
default void deleteBucketShouldDeleteExistingBucketWithItsData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.deleteBucket(TEST_BUCKET_NAME).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
.isInstanceOf(ObjectNotFoundException.class);
@@ -62,10 +64,10 @@ public interface BucketDumbBlobStoreContract {
default void deleteBucketShouldBeIdempotent() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.deleteBucket(TEST_BUCKET_NAME).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
- assertThatCode(() -> store.deleteBucket(TEST_BUCKET_NAME).block())
+ assertThatCode(() -> Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block())
.doesNotThrowAnyException();
}
@@ -73,7 +75,7 @@ public interface BucketDumbBlobStoreContract {
default void saveBytesShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -81,7 +83,7 @@ public interface BucketDumbBlobStoreContract {
default void saveStringShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, SHORT_STRING).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, SHORT_STRING)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -89,7 +91,7 @@ public interface BucketDumbBlobStoreContract {
default void saveInputStreamShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block())
+ assertThatThrownBy(() -> Mono.from(store.save(null, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block())
.isInstanceOf(NullPointerException.class);
}
@@ -97,7 +99,7 @@ public interface BucketDumbBlobStoreContract {
default void readShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
assertThatThrownBy(() -> store.read(null, TEST_BLOB_ID))
.isInstanceOf(NullPointerException.class);
}
@@ -106,8 +108,8 @@ public interface BucketDumbBlobStoreContract {
default void readBytesShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- assertThatThrownBy(() -> store.readBytes(null, TEST_BLOB_ID).block())
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ assertThatThrownBy(() -> Mono.from(store.readBytes(null, TEST_BLOB_ID)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -115,7 +117,7 @@ public interface BucketDumbBlobStoreContract {
default void readStreamShouldThrowWhenBucketDoesNotExist() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read())
.isInstanceOf(ObjectNotFoundException.class);
}
@@ -124,9 +126,9 @@ public interface BucketDumbBlobStoreContract {
default void readBytesShouldThrowWhenBucketDoesNotExist() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
- assertThatThrownBy(() -> store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).block())
+ assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)).block())
.isInstanceOf(ObjectNotFoundException.class);
}
@@ -134,11 +136,11 @@ public interface BucketDumbBlobStoreContract {
default void shouldBeAbleToSaveDataInMultipleBuckets() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
- byte[] bytesDefault = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
- byte[] bytesCustom = store.readBytes(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID).block();
+ byte[] bytesDefault = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+ byte[] bytesCustom = Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID)).block();
assertThat(bytesDefault).isEqualTo(bytesCustom);
}
@@ -149,10 +151,10 @@ public interface BucketDumbBlobStoreContract {
ConcurrentTestRunner.builder()
.operation(((threadNumber, step) ->
- store.save(
+ Mono.from(store.save(
TEST_BUCKET_NAME,
new TestBlobId("id-" + threadNumber + step),
- SHORT_STRING + threadNumber + step).block()))
+ SHORT_STRING + threadNumber + step)).block()))
.threadCount(10)
.operationCount(10)
.runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -162,7 +164,7 @@ public interface BucketDumbBlobStoreContract {
default void deleteBucketConcurrentlyShouldNotFail() throws Exception {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
ConcurrentTestRunner.builder()
.reactorOperation(((threadNumber, step) -> store.deleteBucket(TEST_BUCKET_NAME)))
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
index b0cbeb9..a65362a 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteBlobStoreContract.java
@@ -35,6 +35,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.base.Strings;
+import reactor.core.publisher.Mono;
+
public interface DeleteBlobStoreContract {
String SHORT_STRING = "toto";
@@ -53,7 +55,7 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- assertThatCode(() -> store.delete(defaultBucketName, blobIdFactory().randomId()).block())
+ assertThatCode(() -> Mono.from(store.delete(defaultBucketName, blobIdFactory().randomId())).block())
.doesNotThrowAnyException();
}
@@ -62,8 +64,8 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block();
- store.delete(defaultBucketName, blobId).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST)).block();
+ Mono.from(store.delete(defaultBucketName, blobId)).block();
assertThatThrownBy(() -> store.read(defaultBucketName, blobId).read())
.isInstanceOf(ObjectStoreException.class);
@@ -74,10 +76,10 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block();
- store.delete(defaultBucketName, blobId).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST)).block();
+ Mono.from(store.delete(defaultBucketName, blobId)).block();
- assertThatCode(() -> store.delete(defaultBucketName, blobId).block())
+ assertThatCode(() -> Mono.from(store.delete(defaultBucketName, blobId)).block())
.doesNotThrowAnyException();
}
@@ -86,10 +88,10 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobIdToDelete = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block();
- BlobId otherBlobId = store.save(defaultBucketName, ELEVEN_KILOBYTES, LOW_COST).block();
+ BlobId blobIdToDelete = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST)).block();
+ BlobId otherBlobId = Mono.from(store.save(defaultBucketName, ELEVEN_KILOBYTES, LOW_COST)).block();
- store.delete(defaultBucketName, blobIdToDelete).block();
+ Mono.from(store.delete(defaultBucketName, blobIdToDelete)).block();
InputStream read = store.read(defaultBucketName, otherBlobId);
@@ -101,10 +103,10 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST)).block();
ConcurrentTestRunner.builder()
- .operation(((threadNumber, step) -> store.delete(defaultBucketName, blobId).block()))
+ .operation(((threadNumber, step) -> Mono.from(store.delete(defaultBucketName, blobId)).block()))
.threadCount(10)
.operationCount(10)
.runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -113,7 +115,7 @@ public interface DeleteBlobStoreContract {
@Test
default void deleteShouldThrowWhenNullBucketName() {
BlobStore store = testee();
- assertThatThrownBy(() -> store.delete(null, blobIdFactory().randomId()).block())
+ assertThatThrownBy(() -> Mono.from(store.delete(null, blobIdFactory().randomId())).block())
.isInstanceOf(NullPointerException.class);
}
@@ -122,10 +124,10 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId customBlobId = store.save(CUSTOM, "custom_string", LOW_COST).block();
- BlobId defaultBlobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block();
+ BlobId customBlobId = Mono.from(store.save(CUSTOM, "custom_string", LOW_COST)).block();
+ BlobId defaultBlobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST)).block();
- store.delete(CUSTOM, customBlobId).block();
+ Mono.from(store.delete(CUSTOM, customBlobId)).block();
InputStream read = store.read(defaultBucketName, defaultBlobId);
@@ -137,10 +139,10 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST).block();
- BlobId blobId = store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST).block();
+ Mono.from(store.save(CUSTOM, SHORT_BYTEARRAY, LOW_COST)).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, SHORT_BYTEARRAY, LOW_COST)).block();
- store.delete(defaultBucketName, blobId).block();
+ Mono.from(store.delete(defaultBucketName, blobId)).block();
InputStream read = store.read(CUSTOM, blobId);
@@ -152,7 +154,7 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST)).block();
ConcurrentTestRunner.builder()
.operation(((threadNumber, step) -> {
@@ -167,7 +169,7 @@ public interface DeleteBlobStoreContract {
// normal behavior here
}
- store.delete(defaultBucketName, blobId).block();
+ Mono.from(store.delete(defaultBucketName, blobId)).block();
}))
.threadCount(10)
.operationCount(10)
@@ -179,12 +181,12 @@ public interface DeleteBlobStoreContract {
BlobStore store = testee();
BucketName defaultBucketName = store.getDefaultBucketName();
- BlobId blobId = store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(defaultBucketName, TWELVE_MEGABYTES, LOW_COST)).block();
ConcurrentTestRunner.builder()
.operation(((threadNumber, step) -> {
try {
- byte[] read = store.readBytes(defaultBucketName, blobId).block();
+ byte[] read = Mono.from(store.readBytes(defaultBucketName, blobId)).block();
String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName());
if (!string.equals(TWELVE_MEGABYTES_STRING)) {
throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
@@ -193,7 +195,7 @@ public interface DeleteBlobStoreContract {
// normal behavior here
}
- store.delete(defaultBucketName, blobId).block();
+ Mono.from(store.delete(defaultBucketName, blobId)).block();
}))
.threadCount(10)
.operationCount(10)
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
index e537774..b501e51 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
@@ -52,7 +52,7 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldNotThrowWhenBlobDoesNotExist() {
DumbBlobStore store = testee();
- assertThatCode(() -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block())
+ assertThatCode(() -> Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block())
.doesNotThrowAnyException();
}
@@ -60,7 +60,7 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldNotThrowWhenBucketDoesNotExist() {
DumbBlobStore store = testee();
- assertThatCode(() -> store.delete(BucketName.of("not_existing_bucket_name"), TEST_BLOB_ID).block())
+ assertThatCode(() -> Mono.from(store.delete(BucketName.of("not_existing_bucket_name"), TEST_BLOB_ID)).block())
.doesNotThrowAnyException();
}
@@ -68,8 +68,8 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldDeleteExistingBlobData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read())
.isInstanceOf(ObjectStoreException.class);
@@ -79,10 +79,10 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldBeIdempotent() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
- assertThatCode(() -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block())
+ assertThatCode(() -> Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block())
.doesNotThrowAnyException();
}
@@ -90,10 +90,10 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldNotDeleteOtherBlobs() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
InputStream read = store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID);
@@ -104,10 +104,10 @@ public interface DeleteDumbBlobStoreContract {
default void deleteConcurrentlyShouldNotFail() throws Exception {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
ConcurrentTestRunner.builder()
- .operation(((threadNumber, step) -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block()))
+ .operation(((threadNumber, step) -> Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()))
.threadCount(10)
.operationCount(10)
.runSuccessfullyWithin(Duration.ofMinutes(1));
@@ -116,7 +116,7 @@ public interface DeleteDumbBlobStoreContract {
@Test
default void deleteShouldThrowWhenNullBucketName() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.delete(null, TEST_BLOB_ID).block())
+ assertThatThrownBy(() -> Mono.from(store.delete(null, TEST_BLOB_ID)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -124,10 +124,10 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldNotDeleteFromOtherBucket() {
DumbBlobStore store = testee();
- store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, "custom").block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, "custom")).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
- store.delete(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID).block();
+ Mono.from(store.delete(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID)).block();
InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
@@ -138,10 +138,10 @@ public interface DeleteDumbBlobStoreContract {
default void deleteShouldNotDeleteFromOtherBucketWhenSameBlobId() {
DumbBlobStore store = testee();
- store.save(CUSTOM_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(CUSTOM_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
InputStream read = store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID);
@@ -152,7 +152,7 @@ public interface DeleteDumbBlobStoreContract {
default void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
ConcurrentTestRunner.builder()
.operation(((threadNumber, step) -> {
@@ -167,7 +167,7 @@ public interface DeleteDumbBlobStoreContract {
// normal behavior here
}
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
}))
.threadCount(10)
.operationCount(10)
@@ -178,12 +178,12 @@ public interface DeleteDumbBlobStoreContract {
default void readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
ConcurrentTestRunner.builder()
.operation(((threadNumber, step) -> {
try {
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName());
if (!string.equals(TWELVE_MEGABYTES_STRING)) {
throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length());
@@ -192,7 +192,7 @@ public interface DeleteDumbBlobStoreContract {
// normal behavior here
}
- store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
}))
.threadCount(10)
.operationCount(10)
@@ -202,7 +202,7 @@ public interface DeleteDumbBlobStoreContract {
@Test
default void mixingSaveReadAndDeleteShouldReturnConsistentState() throws ExecutionException, InterruptedException {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
ConcurrentTestRunner.builder()
.randomlyDistributedReactorOperations(
(thread, iteration) -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES),
@@ -216,7 +216,7 @@ public interface DeleteDumbBlobStoreContract {
default Mono<Void> checkConcurrentMixedOperation() {
return
- testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)
+ Mono.from(testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID))
//assertj is very cpu-intensive, let's compute the assertion only when arrays are different
.filter(bytes -> !Arrays.equals(bytes, TWELVE_MEGABYTES))
.doOnNext(bytes -> assertThat(bytes).isEqualTo(TWELVE_MEGABYTES))
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
index a6aed4a..c11b117 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/MetricableBlobStoreContract.java
@@ -37,6 +37,8 @@ import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+
public interface MetricableBlobStoreContract extends BlobStoreContract {
@@ -62,8 +64,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void saveBytesShouldPublishSaveBytesTimerMetrics() {
BlobStore store = testee();
- store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
- store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
+ Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+ Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
.hasSize(2);
@@ -73,8 +75,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void saveStringShouldPublishSaveBytesTimerMetrics() {
BlobStore store = testee();
- store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST).block();
- store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST).block();
+ Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
+ Mono.from(store.save(store.getDefaultBucketName(), STRING_CONTENT, LOW_COST)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_BYTES_TIMER_NAME))
.hasSize(2);
@@ -84,8 +86,8 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void saveInputStreamShouldPublishSaveInputStreamTimerMetrics() {
BlobStore store = testee();
- store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST).block();
- store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST).block();
+ Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
+ Mono.from(store.save(store.getDefaultBucketName(), new ByteArrayInputStream(BYTES_CONTENT), LOW_COST)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(SAVE_INPUT_STREAM_TIMER_NAME))
.hasSize(2);
@@ -95,9 +97,9 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void readBytesShouldPublishReadBytesTimerMetrics() {
BlobStore store = testee();
- BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
- store.readBytes(store.getDefaultBucketName(), blobId).block();
- store.readBytes(store.getDefaultBucketName(), blobId).block();
+ BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+ Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
+ Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(READ_BYTES_TIMER_NAME))
.hasSize(2);
@@ -107,7 +109,7 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void readShouldPublishReadTimerMetrics() {
BlobStore store = testee();
- BlobId blobId = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
store.read(store.getDefaultBucketName(), blobId);
store.read(store.getDefaultBucketName(), blobId);
@@ -120,10 +122,10 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
BlobStore store = testee();
BucketName bucketName = BucketName.of("custom");
- store.save(BucketName.DEFAULT, BYTES_CONTENT, LOW_COST).block();
- store.save(bucketName, BYTES_CONTENT, LOW_COST).block();
+ Mono.from(store.save(BucketName.DEFAULT, BYTES_CONTENT, LOW_COST)).block();
+ Mono.from(store.save(bucketName, BYTES_CONTENT, LOW_COST)).block();
- store.deleteBucket(bucketName).block();
+ Mono.from(store.deleteBucket(bucketName)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_BUCKET_TIMER_NAME))
.hasSize(1);
@@ -133,11 +135,11 @@ public interface MetricableBlobStoreContract extends BlobStoreContract {
default void deleteShouldPublishDeleteTimerMetrics() {
BlobStore store = testee();
- BlobId blobId1 = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
- BlobId blobId2 = store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST).block();
+ BlobId blobId1 = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
+ BlobId blobId2 = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block();
- store.delete(BucketName.DEFAULT, blobId1).block();
- store.delete(BucketName.DEFAULT, blobId2).block();
+ Mono.from(store.delete(BucketName.DEFAULT, blobId1)).block();
+ Mono.from(store.delete(BucketName.DEFAULT, blobId2)).block();
assertThat(metricsTestExtension.getMetricFactory().executionTimesFor(DELETE_TIMER_NAME))
.hasSize(2);
diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
index 833dafd..dae10f6 100644
--- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
+++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
@@ -46,6 +46,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import com.google.common.io.ByteSource;
+
import reactor.core.publisher.Mono;
public interface ReadSaveDumbBlobStoreContract {
@@ -56,7 +57,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldThrowWhenNullData() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null).block())
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -64,7 +65,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldThrowWhenNullString() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null).block())
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -72,7 +73,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldThrowWhenNullInputStream() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null).block())
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -80,7 +81,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldThrowWhenNullByteSource() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (ByteSource) null).block())
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (ByteSource) null)).block())
.isInstanceOf(NullPointerException.class);
}
@@ -88,8 +89,8 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldSaveEmptyData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY)).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEmpty();
}
@@ -98,9 +99,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldSaveEmptyString() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "")).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
}
@@ -109,9 +110,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldSaveEmptyInputStream() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY))).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEmpty();
}
@@ -120,9 +121,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldSaveEmptyByteSource() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty())).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEmpty();
}
@@ -131,7 +132,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void readBytesShouldThrowWhenNotExisting() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown")).block())
+ assertThatThrownBy(() -> Mono.from(store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown"))).block())
.isExactlyInstanceOf(ObjectNotFoundException.class);
}
@@ -139,9 +140,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void readBytesShouldReturnSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
}
@@ -150,9 +151,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void readBytesShouldReturnLongSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
}
@@ -161,9 +162,9 @@ public interface ReadSaveDumbBlobStoreContract {
default void readBytesShouldReturnBigSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
- byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] bytes = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
}
@@ -180,7 +181,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveShouldCreateBucket() {
DumbBlobStore store = testee();
BucketName nonExisting = BucketName.of("non-existing-bucket");
- store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
//read for a non-existing bucket would throw
assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID))
@@ -190,7 +191,7 @@ public interface ReadSaveDumbBlobStoreContract {
@Test
default void readShouldReturnSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block();
InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
@@ -200,7 +201,7 @@ public interface ReadSaveDumbBlobStoreContract {
@Test
default void readShouldReturnLongSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block();
InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
@@ -210,7 +211,7 @@ public interface ReadSaveDumbBlobStoreContract {
@Test
default void readShouldReturnBigSavedData() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES)).block();
InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
@@ -221,10 +222,10 @@ public interface ReadSaveDumbBlobStoreContract {
@MethodSource("blobs")
default void saveBytesShouldBeIdempotent(String description, byte[] bytes) {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block();
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(read).isEqualTo(bytes);
}
@@ -233,10 +234,10 @@ public interface ReadSaveDumbBlobStoreContract {
@MethodSource("blobs")
default void saveByteSourceShouldBeIdempotent(String description, byte[] bytes) {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block();
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(read).isEqualTo(bytes);
}
@@ -245,10 +246,10 @@ public interface ReadSaveDumbBlobStoreContract {
@MethodSource("blobs")
default void saveInputStreamShouldBeIdempotent(String description, byte[] bytes) {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes))).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes))).block();
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(read).isEqualTo(bytes);
}
@@ -257,12 +258,11 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
- .onErrorResume(throwable -> Mono.empty())
- .block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream()))
+ .onErrorResume(throwable -> Mono.empty()).block();
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
}
@@ -271,17 +271,16 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() {
DumbBlobStore store = testee();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block();
- store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES))).block();
+ Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return getThrowingInputStream();
}
- })
- .onErrorResume(throwable -> Mono.empty())
- .block();
+ }))
+ .onErrorResume(throwable -> Mono.empty()).block();
- byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+ byte[] read = Mono.from(store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
}
@@ -290,13 +289,12 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveByteSourceShouldThrowOnIOException() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return getThrowingInputStream();
}
- })
- .block())
+ })).block())
.isInstanceOf(ObjectStoreIOException.class);
}
@@ -304,8 +302,7 @@ public interface ReadSaveDumbBlobStoreContract {
default void saveInputStreamShouldThrowOnIOException() {
DumbBlobStore store = testee();
- assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
- .block())
+ assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())).block())
.isInstanceOf(ObjectStoreIOException.class);
}
@@ -317,7 +314,7 @@ public interface ReadSaveDumbBlobStoreContract {
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource(value = "blobs")
default void concurrentSaveBytesShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException {
- testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
+ Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block();
ConcurrentTestRunner.builder()
.randomlyDistributedReactorOperations(
(threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes),
@@ -331,7 +328,7 @@ public interface ReadSaveDumbBlobStoreContract {
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("blobs")
default void concurrentSaveInputStreamShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException {
- testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
+ Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block();
ConcurrentTestRunner.builder()
.randomlyDistributedReactorOperations(
(threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)),
@@ -345,7 +342,7 @@ public interface ReadSaveDumbBlobStoreContract {
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("blobs")
default void concurrentSaveByteSourceShouldReturnConsistentValues(String description, byte[] bytes) throws ExecutionException, InterruptedException {
- testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
+ Mono.from(testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes)).block();
ConcurrentTestRunner.builder()
.randomlyDistributedReactorOperations(
(threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)),
@@ -357,7 +354,7 @@ public interface ReadSaveDumbBlobStoreContract {
}
default Mono<Void> checkConcurrentSaveOperation(byte[] expected) {
- return testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)
+ return Mono.from(testee().readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID))
//assertj is very cpu-intensive, let's compute the assertion only when arrays are different
.filter(bytes -> !Arrays.equals(bytes, expected))
.doOnNext(bytes -> assertThat(bytes).isEqualTo(expected))
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
index 51f6da8..e073231 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreTest.java
@@ -89,9 +89,9 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
@Test
void readBytesShouldReturnSplitSavedDataByChunk() {
String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
- BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block();
+ BlobId blobId = Mono.from(testee.save(testee.getDefaultBucketName(), longString, LOW_COST)).block();
- byte[] bytes = testee.readBytes(testee.getDefaultBucketName(), blobId).block();
+ byte[] bytes = Mono.from(testee.readBytes(testee.getDefaultBucketName(), blobId)).block();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
}
@@ -100,11 +100,11 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
void readBytesShouldNotReturnInvalidResultsWhenPartialDataPresent() {
int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
String longString = Strings.repeat("0123456789\n", repeatCount);
- BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block();
+ BlobId blobId = Mono.from(testee.save(testee.getDefaultBucketName(), longString, LOW_COST)).block();
when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
- assertThatThrownBy(() -> testee.readBytes(testee.getDefaultBucketName(), blobId).block())
+ assertThatThrownBy(() -> Mono.from(testee.readBytes(testee.getDefaultBucketName(), blobId)).block())
.isInstanceOf(ObjectStoreException.class)
.hasMessageContaining("Missing blob part for blobId");
}
@@ -113,7 +113,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
void readShouldNotReturnInvalidResultsWhenPartialDataPresent() {
int repeatCount = MULTIPLE_CHUNK_SIZE * CHUNK_SIZE;
String longString = Strings.repeat("0123456789\n", repeatCount);
- BlobId blobId = testee.save(testee.getDefaultBucketName(), longString, LOW_COST).block();
+ BlobId blobId = Mono.from(testee.save(testee.getDefaultBucketName(), longString, LOW_COST)).block();
when(defaultBucketDAO.readPart(blobId, 1)).thenReturn(Mono.empty());
@@ -133,7 +133,7 @@ public class CassandraBlobStoreTest implements MetricableBlobStoreContract {
void blobStoreShouldSupport100MBBlob() throws IOException {
ZeroedInputStream data = new ZeroedInputStream(100_000_000);
HashingInputStream writeHash = new HashingInputStream(Hashing.sha256(), data);
- BlobId blobId = testee.save(testee.getDefaultBucketName(), writeHash, LOW_COST).block();
+ BlobId blobId = Mono.from(testee.save(testee.getDefaultBucketName(), writeHash, LOW_COST)).block();
InputStream bytes = testee.read(testee.getDefaultBucketName(), blobId);
HashingInputStream readHash = new HashingInputStream(Hashing.sha256(), bytes);
diff --git a/metrics/metrics-api/pom.xml b/server/blob/blob-common/pom.xml
similarity index 76%
copy from metrics/metrics-api/pom.xml
copy to server/blob/blob-common/pom.xml
index 3212965..57cf59d 100644
--- a/metrics/metrics-api/pom.xml
+++ b/server/blob/blob-common/pom.xml
@@ -17,32 +17,29 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
+
<parent>
+ <artifactId>james-server-blob</artifactId>
<groupId>org.apache.james</groupId>
- <artifactId>metrics</artifactId>
<version>3.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>metrics-api</artifactId>
+ <artifactId>blob-common</artifactId>
+ <packaging>jar</packaging>
- <name>Apache James :: Metrics :: API</name>
+ <name>Apache James :: Server :: Blob :: Common</name>
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>testing-base</artifactId>
- <scope>test</scope>
+ <artifactId>blob-api</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
</dependencies>
-
</project>
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
similarity index 83%
rename from server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
rename to server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
index f37605c..0c51da6 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
@@ -20,7 +20,6 @@
package org.apache.james.blob.api;
import java.util.Collection;
-import java.util.Objects;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
@@ -36,33 +35,6 @@ public interface Store<T, I> {
Mono<T> read(I blobIds);
- class BlobType {
- private final String name;
-
- public BlobType(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- @Override
- public final boolean equals(Object o) {
- if (o instanceof BlobType) {
- BlobType blobType = (BlobType) o;
-
- return Objects.equals(this.name, blobType.name);
- }
- return false;
- }
-
- @Override
- public final int hashCode() {
- return Objects.hash(name);
- }
- }
-
class Impl<T, I extends BlobPartsId> implements Store<T, I> {
public interface ValueToSave {
@@ -80,7 +52,7 @@ public interface Store<T, I> {
@Override
public Mono<BlobId> saveIn(BucketName bucketName, BlobStore blobStore) {
- return blobStore.save(bucketName, bytes, storagePolicy);
+ return Mono.from(blobStore.save(bucketName, bytes, storagePolicy));
}
}
@@ -122,7 +94,7 @@ public interface Store<T, I> {
return Flux.fromIterable(blobIds.asMap().entrySet())
.publishOn(Schedulers.elastic())
.flatMapSequential(
- entry -> blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue())
+ entry -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue()))
.zipWith(Mono.just(entry.getKey())))
.map(entry -> Pair.of(entry.getT2(), entry.getT1()))
.collectList()
diff --git a/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java b/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java
index 91a85a0..51c86bc 100644
--- a/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java
+++ b/server/blob/blob-export-file/src/test/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanismTest.java
@@ -55,6 +55,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import nl.jqno.equalsverifier.EqualsVerifier;
+import reactor.core.publisher.Mono;
@ExtendWith(FileSystemExtension.class)
class LocalFileBlobExportMechanismTest {
@@ -81,7 +82,7 @@ class LocalFileBlobExportMechanismTest {
@Test
void exportingBlobShouldSendAMail() {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST)).block();
String explanation = "The content of a deleted message vault had been shared with you.";
testee.blobId(blobId)
@@ -114,7 +115,7 @@ class LocalFileBlobExportMechanismTest {
@Test
void exportingBlobShouldCreateAFileWithTheCorrespondingContent(FileSystem fileSystem) {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST)).block();
testee.blobId(blobId)
.with(MailAddressFixture.RECIPIENT1)
@@ -152,7 +153,7 @@ class LocalFileBlobExportMechanismTest {
@Test
void exportingBlobShouldCreateAFileWithoutExtensionWhenNotDeclaringExtension() {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST)).block();
testee.blobId(blobId)
.with(MailAddressFixture.RECIPIENT1)
@@ -176,7 +177,7 @@ class LocalFileBlobExportMechanismTest {
@Test
void exportingBlobShouldCreateAFileWithExtensionWhenDeclaringExtension() {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST)).block();
testee.blobId(blobId)
.with(MailAddressFixture.RECIPIENT1)
@@ -201,7 +202,7 @@ class LocalFileBlobExportMechanismTest {
@Test
void exportingBlobShouldCreateAFileWithPrefixWhenDeclaringPrefix() {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), BLOB_CONTENT, LOW_COST)).block();
String filePrefix = "deleted-message-of-bob@james.org";
testee.blobId(blobId)
diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml
index c28af44..6a6888c 100644
--- a/server/blob/blob-memory/pom.xml
+++ b/server/blob/blob-memory/pom.xml
@@ -44,6 +44,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-util</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>metrics-tests</artifactId>
<scope>test</scope>
</dependency>
@@ -57,6 +62,10 @@
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java
index a5b1fec..feb8046 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreContract.java
@@ -29,6 +29,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
+import reactor.core.publisher.Mono;
+
public interface ObjectStorageBlobStoreContract {
@@ -39,7 +41,7 @@ public interface ObjectStorageBlobStoreContract {
default void assertBlobStoreCanStoreAndRetrieve(ObjectStorageBlobStoreBuilder.ReadyToBuild builder) {
ObjectStorageBlobStore blobStore = builder.build();
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), CONTENT, LOW_COST)).block();
InputStream inputStream = blobStore.read(blobStore.getDefaultBucketName(), blobId);
assertThat(inputStream).hasSameContentAs(IOUtils.toInputStream(CONTENT, StandardCharsets.UTF_8));
diff --git a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
index 263e105..10fb3c9 100644
--- a/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
+++ b/server/blob/blob-objectstorage/src/test/java/org/apache/james/blob/objectstorage/ObjectStorageBlobStoreTest.java
@@ -120,7 +120,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
.namespace(defaultBucketName)
.build();
String content = "James is the best!";
- BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST).block();
+ BlobId blobId = Mono.from(encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST)).block();
InputStream read = encryptedBlobStore.read(encryptedBlobStore.getDefaultBucketName(), blobId);
String expectedContent = IOUtils.toString(read, Charsets.UTF_8);
@@ -136,7 +136,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
.namespace(defaultBucketName)
.build();
String content = "James is the best!";
- BlobId blobId = encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST).block();
+ BlobId blobId = Mono.from(encryptedBlobStore.save(encryptedBlobStore.getDefaultBucketName(), content, LOW_COST)).block();
InputStream encryptedIs = testee.read(encryptedBlobStore.getDefaultBucketName(), blobId);
assertThat(encryptedIs).isNotNull();
@@ -151,7 +151,7 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
@Test
void deleteBucketShouldDeleteSwiftContainer() {
BucketName bucketName = BucketName.of("azerty");
- objectStorageBlobStore.save(bucketName, "data", LOW_COST).block();
+ Mono.from(objectStorageBlobStore.save(bucketName, "data", LOW_COST)).block();
objectStorageBlobStore.deleteBucket(bucketName).block();
@@ -177,32 +177,32 @@ public class ObjectStorageBlobStoreTest implements MetricableBlobStoreContract {
@Test
void saveBytesShouldNotCompleteWhenDoesNotAwait() {
// String need to be big enough to get async thread busy hence could not return result instantly
- Mono<BlobId> blobIdFuture = testee
- .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8), LOW_COST)
+ Mono<BlobId> blobIdFuture = Mono.from(testee
+ .save(testee.getDefaultBucketName(), BIG_STRING.getBytes(StandardCharsets.UTF_8), LOW_COST))
.subscribeOn(Schedulers.elastic());
assertThat(blobIdFuture.toFuture()).isNotCompleted();
}
@Test
void saveStringShouldNotCompleteWhenDoesNotAwait() {
- Mono<BlobId> blobIdFuture = testee
- .save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST)
+ Mono<BlobId> blobIdFuture = Mono.from(testee
+ .save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST))
.subscribeOn(Schedulers.elastic());
assertThat(blobIdFuture.toFuture()).isNotCompleted();
}
@Test
void saveInputStreamShouldNotCompleteWhenDoesNotAwait() {
- Mono<BlobId> blobIdFuture = testee
- .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), LOW_COST)
+ Mono<BlobId> blobIdFuture = Mono.from(testee
+ .save(testee.getDefaultBucketName(), new ByteArrayInputStream(BIG_STRING.getBytes(StandardCharsets.UTF_8)), LOW_COST))
.subscribeOn(Schedulers.elastic());
assertThat(blobIdFuture.toFuture()).isNotCompleted();
}
@Test
void readBytesShouldNotCompleteWhenDoesNotAwait() {
- BlobId blobId = testee().save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST).block();
- Mono<byte[]> resultFuture = testee.readBytes(testee.getDefaultBucketName(), blobId).subscribeOn(Schedulers.elastic());
+ BlobId blobId = Mono.from(testee().save(testee.getDefaultBucketName(), BIG_STRING, LOW_COST)).block();
+ Mono<byte[]> resultFuture = Mono.from(testee.readBytes(testee.getDefaultBucketName(), blobId)).subscribeOn(Schedulers.elastic());
assertThat(resultFuture.toFuture()).isNotCompleted();
}
}
diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/HybridBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/HybridBlobStore.java
index 93e1532..0d700d8 100644
--- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/HybridBlobStore.java
+++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/HybridBlobStore.java
@@ -129,7 +129,7 @@ public class HybridBlobStore implements BlobStore {
@Override
public Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy) {
return selectBlobStore(storagePolicy, Mono.just(data.length > configuration.getSizeThreshold()))
- .flatMap(blobStore -> blobStore.save(bucketName, data, storagePolicy));
+ .flatMap(blobStore -> Mono.from(blobStore.save(bucketName, data, storagePolicy)));
}
@Override
@@ -138,7 +138,7 @@ public class HybridBlobStore implements BlobStore {
BufferedInputStream bufferedInputStream = new BufferedInputStream(data, configuration.getSizeThreshold() + 1);
return selectBlobStore(storagePolicy, Mono.fromCallable(() -> isItABigStream(bufferedInputStream)))
- .flatMap(blobStore -> blobStore.save(bucketName, bufferedInputStream, storagePolicy));
+ .flatMap(blobStore -> Mono.from(blobStore.save(bucketName, bufferedInputStream, storagePolicy)));
}
private Mono<BlobStore> selectBlobStore(StoragePolicy storagePolicy, Mono<Boolean> largeData) {
@@ -180,9 +180,9 @@ public class HybridBlobStore implements BlobStore {
@Override
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
- return Mono.defer(() -> highPerformanceBlobStore.readBytes(bucketName, blobId))
+ return Mono.defer(() -> Mono.from(highPerformanceBlobStore.readBytes(bucketName, blobId)))
.onErrorResume(this::logAndReturnEmpty)
- .switchIfEmpty(Mono.defer(() -> lowCostBlobStore.readBytes(bucketName, blobId)));
+ .switchIfEmpty(Mono.defer(() -> Mono.from(lowCostBlobStore.readBytes(bucketName, blobId))));
}
@Override
@@ -199,14 +199,14 @@ public class HybridBlobStore implements BlobStore {
@Override
public Mono<Void> deleteBucket(BucketName bucketName) {
- return Mono.defer(() -> lowCostBlobStore.deleteBucket(bucketName))
+ return Mono.defer(() -> Mono.from(lowCostBlobStore.deleteBucket(bucketName)))
.and(highPerformanceBlobStore.deleteBucket(bucketName))
.onErrorResume(this::logDeleteFailureAndReturnEmpty);
}
@Override
public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
- return Mono.defer(() -> lowCostBlobStore.delete(bucketName, blobId))
+ return Mono.defer(() -> Mono.from(lowCostBlobStore.delete(bucketName, blobId)))
.and(highPerformanceBlobStore.delete(bucketName, blobId))
.onErrorResume(this::logDeleteFailureAndReturnEmpty);
}
diff --git a/server/blob/mail-store/pom.xml b/server/blob/mail-store/pom.xml
index 2ff830b..198faa5 100644
--- a/server/blob/mail-store/pom.xml
+++ b/server/blob/mail-store/pom.xml
@@ -38,6 +38,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>blob-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>blob-memory</artifactId>
<scope>test</scope>
</dependency>
@@ -48,6 +52,10 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
index 7dc681a..5981be8 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java
@@ -24,7 +24,7 @@ import java.util.Objects;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobPartsId;
-import org.apache.james.blob.api.Store;
+import org.apache.james.blob.api.BlobType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -63,7 +63,7 @@ public class MimeMessagePartsId implements BlobPartsId {
public static class Factory implements BlobPartsId.Factory<MimeMessagePartsId> {
@Override
- public MimeMessagePartsId generate(Map<Store.BlobType, BlobId> map) {
+ public MimeMessagePartsId generate(Map<BlobType, BlobId> map) {
Preconditions.checkArgument(map.keySet().contains(HEADER_BLOB_TYPE), "Expecting 'mailHeader' blobId to be specified");
Preconditions.checkArgument(map.keySet().contains(BODY_BLOB_TYPE), "Expecting 'mailBody' blobId to be specified");
Preconditions.checkArgument(map.size() == 2, "blobId other than 'mailHeader' or 'mailBody' are not supported");
@@ -75,8 +75,8 @@ public class MimeMessagePartsId implements BlobPartsId {
}
}
- static final Store.BlobType HEADER_BLOB_TYPE = new Store.BlobType("mailHeader");
- static final Store.BlobType BODY_BLOB_TYPE = new Store.BlobType("mailBody");
+ static final BlobType HEADER_BLOB_TYPE = new BlobType("mailHeader");
+ static final BlobType BODY_BLOB_TYPE = new BlobType("mailBody");
private final BlobId headerBlobId;
private final BlobId bodyBlobId;
@@ -87,7 +87,7 @@ public class MimeMessagePartsId implements BlobPartsId {
}
@Override
- public Map<Store.BlobType, BlobId> asMap() {
+ public Map<BlobType, BlobId> asMap() {
return ImmutableMap.of(
HEADER_BLOB_TYPE, headerBlobId,
BODY_BLOB_TYPE, bodyBlobId);
diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
index ad11196..47194a7 100644
--- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
+++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java
@@ -43,8 +43,8 @@ import javax.mail.internet.MimeMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobType;
import org.apache.james.blob.api.Store;
-import org.apache.james.blob.api.Store.BlobType;
import org.apache.james.util.BodyOffsetInputStream;
import com.google.common.base.Preconditions;
diff --git a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
index ca3e64d..a7c0649 100644
--- a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
+++ b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java
@@ -38,6 +38,8 @@ import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
class MimeMessageStoreTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -113,7 +115,7 @@ class MimeMessageStoreTest {
BlobId headerBlobId = parts.getHeaderBlobId();
BlobId bodyBlobId = parts.getBodyBlobId();
- softly.assertThat(new String(blobStore.readBytes(blobStore.getDefaultBucketName(), headerBlobId).block(), StandardCharsets.UTF_8))
+ softly.assertThat(new String(Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), headerBlobId)).block(), StandardCharsets.UTF_8))
.isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 (ICT)\r\n" +
"From: any@any.com\r\n" +
"To: toddy@any.com\r\n" +
@@ -122,7 +124,7 @@ class MimeMessageStoreTest {
"MIME-Version: 1.0\r\n" +
"Content-Type: text/plain; charset=UTF-8\r\n" +
"Content-Transfer-Encoding: 7bit\r\n\r\n");
- softly.assertThat(new String(blobStore.readBytes(blobStore.getDefaultBucketName(), bodyBlobId).block(), StandardCharsets.UTF_8))
+ softly.assertThat(new String(Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), bodyBlobId)).block(), StandardCharsets.UTF_8))
.isEqualTo("Important mail content");
});
}
diff --git a/server/blob/pom.xml b/server/blob/pom.xml
index 43d91bd..7385bfb 100644
--- a/server/blob/pom.xml
+++ b/server/blob/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>blob-api</module>
<module>blob-cassandra</module>
+ <module>blob-common</module>
<module>blob-export-api</module>
<module>blob-export-file</module>
<module>blob-memory</module>
diff --git a/server/container/jetty/pom.xml b/server/container/jetty/pom.xml
index 5c80c03..32c5963 100644
--- a/server/container/jetty/pom.xml
+++ b/server/container/jetty/pom.xml
@@ -35,6 +35,10 @@
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-core</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/container/lifecycle-api/pom.xml b/server/container/lifecycle-api/pom.xml
index 9f020e4..dabda94 100644
--- a/server/container/lifecycle-api/pom.xml
+++ b/server/container/lifecycle-api/pom.xml
@@ -48,14 +48,6 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
diff --git a/server/container/metrics/metrics-es-reporter/pom.xml b/server/container/metrics/metrics-es-reporter/pom.xml
index 4d3ee84..3414cd3 100644
--- a/server/container/metrics/metrics-es-reporter/pom.xml
+++ b/server/container/metrics/metrics-es-reporter/pom.xml
@@ -43,11 +43,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-lifecycle-api</artifactId>
+ <artifactId>james-core</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-util</artifactId>
+ <artifactId>james-server-lifecycle-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
diff --git a/server/data/data-api/pom.xml b/server/data/data-api/pom.xml
index f98422d..3818153 100644
--- a/server/data/data-api/pom.xml
+++ b/server/data/data-api/pom.xml
@@ -43,10 +43,6 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-util</artifactId>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Keyword.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Keyword.java
index 7b75299..d3296c2 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Keyword.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Keyword.java
@@ -24,7 +24,6 @@ import java.util.Optional;
import javax.mail.Flags;
import org.apache.commons.lang3.StringUtils;
-import org.apache.james.util.UnicodeSetUtils;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
@@ -36,11 +35,11 @@ public class Keyword {
private static final int FLAG_NAME_MIN_LENGTH = 1;
private static final int FLAG_NAME_MAX_LENGTH = 255;
private static final UnicodeSet FLAG_NAME_PATTERN =
- UnicodeSetUtils.letterOrDigitUnicodeSet()
- .add('$')
- .add('_')
- .add('-')
- .freeze();
+ new UnicodeSet("[[a-z][A-Z][0-9]]")
+ .add('$')
+ .add('_')
+ .add('-')
+ .freeze();
public static final Keyword DRAFT = Keyword.of("$Draft");
public static final Keyword SEEN = Keyword.of("$Seen");
diff --git a/server/protocols/webadmin/webadmin-core/pom.xml b/server/protocols/webadmin/webadmin-core/pom.xml
index d77687a..2b8680b 100644
--- a/server/protocols/webadmin/webadmin-core/pom.xml
+++ b/server/protocols/webadmin/webadmin-core/pom.xml
@@ -136,6 +136,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit-assertj</artifactId>
<scope>test</scope>
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
index f3ca133..1ba4ed2 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
@@ -87,7 +87,7 @@ class ExportService {
try (FileBackedOutputStream fileOutputStream = new FileBackedOutputStream(FileUtils.ONE_MB_BI.intValue())) {
zipper.zip(contentLoader(username), messages.toStream(), fileOutputStream);
ByteSource byteSource = fileOutputStream.asByteSource();
- return blobStore.save(blobStore.getDefaultBucketName(), byteSource.openStream(), LOW_COST).block();
+ return Mono.from(blobStore.save(blobStore.getDefaultBucketName(), byteSource.openStream(), LOW_COST)).block();
}
}
diff --git a/server/queue/queue-api/pom.xml b/server/queue/queue-api/pom.xml
index 81c1abc..f0bab70 100644
--- a/server/queue/queue-api/pom.xml
+++ b/server/queue/queue-api/pom.xml
@@ -66,6 +66,11 @@
<artifactId>javax.mail</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
diff --git a/server/queue/queue-jms/pom.xml b/server/queue/queue-jms/pom.xml
index 80f638d..44404db 100644
--- a/server/queue/queue-jms/pom.xml
+++ b/server/queue/queue-jms/pom.xml
@@ -89,6 +89,10 @@
<artifactId>javax.mail</artifactId>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<scope>test</scope>
diff --git a/server/task/task-api/pom.xml b/server/task/task-api/pom.xml
index 5c6fea7..61daaec 100644
--- a/server/task/task-api/pom.xml
+++ b/server/task/task-api/pom.xml
@@ -35,14 +35,19 @@
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>james-server-util</artifactId>
+ <artifactId>testing-base</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>testing-base</artifactId>
+ <groupId>com.github.fge</groupId>
+ <artifactId>throwing-lambdas</artifactId>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
@@ -64,6 +69,10 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.base}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/server/task/task-json/pom.xml b/server/task/task-json/pom.xml
index e6a3dd0..4782d4d 100644
--- a/server/task/task-json/pom.xml
+++ b/server/task/task-json/pom.xml
@@ -63,6 +63,10 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ </dependency>
+ <dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit-assertj</artifactId>
<scope>test</scope>
diff --git a/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java b/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java
index 736707c..3148e50 100644
--- a/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java
+++ b/third-party/linshare/src/test/java/org/apache/james/linshare/LinshareBlobExportMechanismTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+
class LinshareBlobExportMechanismTest {
private static final String FILE_CONTENT = "content";
private static final String EXPLANATION = "Explanation about the file being shared";
@@ -68,7 +70,7 @@ class LinshareBlobExportMechanismTest {
@Test
void exportShouldUploadTheDocumentToTargetUserViaLinshare() throws Exception {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST)).block();
String filePrefix = "deleted-message-of-bob@james.org-";
testee.blobId(blobId)
@@ -86,7 +88,7 @@ class LinshareBlobExportMechanismTest {
@Test
void exportShouldUploadTheDocumentAndAllowDownloadViaLinshare(LinshareAPIForTechnicalAccountTesting delegationAPIForTesting) throws Exception {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST)).block();
testee.blobId(blobId)
.with(new MailAddress(USER_2.getUsername()))
@@ -116,7 +118,7 @@ class LinshareBlobExportMechanismTest {
@Test
void exportWithFilePrefixShouldCreateFileWithCustomPrefix() throws Exception {
- BlobId blobId = blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST).block();
+ BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), FILE_CONTENT, LOW_COST)).block();
String filePrefix = "deleted-message-of-bob@james.org";
testee.blobId(blobId)
@@ -130,4 +132,4 @@ class LinshareBlobExportMechanismTest {
assertThat(sharedDoc.getName())
.startsWith(filePrefix);
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org