You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/26 07:50:13 UTC
[james-project] 02/02: JAMES-3150 BlobGCService &
BloomFilterGCAlgorithm
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 59b035febe08d990e4120bb16369a2be41719bf9
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Tue Aug 24 18:04:28 2021 +0700
JAMES-3150 BlobGCService & BloomFilterGCAlgorithm
---
pom.xml | 6 +
server/blob/blob-memory/pom.xml | 11 +
.../memory/MemoryBlobStoreGCAlgorithmTest.java | 39 +++
server/blob/blob-storage-strategy/pom.xml | 17 ++
.../server/blob/deduplication/BlobGCTask.java | 165 +++++++++++
.../blob/deduplication/BloomFilterGCAlgorithm.java | 304 +++++++++++++++++++++
.../BloomFilterGCAlgorithmContract.java | 258 +++++++++++++++++
7 files changed, 800 insertions(+)
diff --git a/pom.xml b/pom.xml
index cce10fe..aab9136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1143,6 +1143,12 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>blob-storage-strategy</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>dead-letter-cassandra</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml
index 1938dae..a11713c 100644
--- a/server/blob/blob-memory/pom.xml
+++ b/server/blob/blob-memory/pom.xml
@@ -48,6 +48,17 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>blob-storage-strategy</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-util</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java
new file mode 100644
index 0000000..ebe1928
--- /dev/null
+++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java
@@ -0,0 +1,39 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.memory;
+
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithmContract;
+import org.junit.jupiter.api.BeforeEach;
+
+public class MemoryBlobStoreGCAlgorithmTest implements BloomFilterGCAlgorithmContract {
+
+ private BlobStoreDAO blobStoreDAO;
+
+ @BeforeEach
+ public void beforeEach() {
+ blobStoreDAO = new MemoryBlobStoreDAO();
+ }
+
+ @Override
+ public BlobStoreDAO blobStoreDAO() {
+ return blobStoreDAO;
+ }
+}
diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml
index ad1bb7a..af1daa1 100644
--- a/server/blob/blob-storage-strategy/pom.xml
+++ b/server/blob/blob-storage-strategy/pom.xml
@@ -45,6 +45,10 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-task-api</artifactId>
+ </dependency>
+ <dependency>
<!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 -->
<groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
@@ -63,6 +67,11 @@
<artifactId>commons-configuration2</artifactId>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
@@ -78,6 +87,14 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <reuseForks>true</reuseForks>
+ <forkCount>1C</forkCount>
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
new file mode 100644
index 0000000..660422b
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
@@ -0,0 +1,165 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.scheduler.Schedulers;
+
+public class BlobGCTask implements Task {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCTask.class);
+ public static final TaskType TASK_TYPE = TaskType.of("BlobGCTask");
+
+ public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+ private static AdditionalInformation from(Context context) {
+ Context.Snapshot snapshot = context.snapshot();
+ return new AdditionalInformation(
+ snapshot.getReferenceSourceCount(),
+ snapshot.getBlobCount(),
+ snapshot.getGcedBlobCount(),
+ snapshot.getErrorCount(),
+ snapshot.getBloomFilterExpectedBlobCount(),
+ snapshot.getBloomFilterAssociatedProbability());
+ }
+
+ private final Instant timestamp;
+ private final long referenceSourceCount;
+ private final long blobCount;
+ private final long gcedBlobCount;
+ private final long errorCount;
+ private final long bloomFilterExpectedBlobCount;
+ private final double bloomFilterAssociatedProbability;
+
+ AdditionalInformation(long referenceSourceCount,
+ long blobCount,
+ long gcedBlobCount,
+ long errorCount,
+ long bloomFilterExpectedBlobCount,
+ double bloomFilterAssociatedProbability) {
+ this.referenceSourceCount = referenceSourceCount;
+ this.blobCount = blobCount;
+ this.gcedBlobCount = gcedBlobCount;
+ this.errorCount = errorCount;
+ this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+ this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+ this.timestamp = Clock.systemUTC().instant();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public long getReferenceSourceCount() {
+ return referenceSourceCount;
+ }
+
+ public long getBlobCount() {
+ return blobCount;
+ }
+
+ public long getGcedBlobCount() {
+ return gcedBlobCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public long getBloomFilterExpectedBlobCount() {
+ return bloomFilterExpectedBlobCount;
+ }
+
+ public double getBloomFilterAssociatedProbability() {
+ return bloomFilterAssociatedProbability;
+ }
+ }
+
+ private final BlobStoreDAO blobStoreDAO;
+ private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+ private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+ private final Set<BlobReferenceSource> blobReferenceSources;
+ private final Clock clock;
+ private final BucketName bucketName;
+ private final int expectedBlobCount;
+ private final double associatedProbability;
+ private final Context context;
+
+ public BlobGCTask(BlobStoreDAO blobStoreDAO,
+ GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+ GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+ Set<BlobReferenceSource> blobReferenceSources,
+ BucketName bucketName,
+ Clock clock,
+ int expectedBlobCount,
+ double associatedProbability) {
+ this.blobStoreDAO = blobStoreDAO;
+ this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+ this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+ this.blobReferenceSources = blobReferenceSources;
+ this.clock = clock;
+ this.bucketName = bucketName;
+ this.expectedBlobCount = expectedBlobCount;
+ this.associatedProbability = associatedProbability;
+ this.context = new Context(expectedBlobCount, associatedProbability);
+ }
+
+ @Override
+ public Result run() throws InterruptedException {
+ BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+ BlobReferenceAggregate.aggregate(blobReferenceSources),
+ blobStoreDAO,
+ generationAwareBlobIdFactory,
+ generationAwareBlobIdConfiguration,
+ clock);
+
+ return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
+ .subscribeOn(Schedulers.elastic())
+ .block();
+ }
+
+ @Override
+ public TaskType type() {
+ return TASK_TYPE;
+ }
+
+ @Override
+ public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+ return Optional.of(AdditionalInformation.from(context));
+ }
+}
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
new file mode 100644
index 0000000..d9fc0aa
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
@@ -0,0 +1,304 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+ private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+ public static class Context {
+
+ public static class Snapshot {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ static class Builder {
+ private Optional<Long> referenceSourceCount;
+ private Optional<Long> blobCount;
+ private Optional<Long> gcedBlobCount;
+ private Optional<Long> errorCount;
+ private Optional<Long> bloomFilterExpectedBlobCount;
+ private Optional<Double> bloomFilterAssociatedProbability;
+
+ Builder() {
+ referenceSourceCount = Optional.empty();
+ blobCount = Optional.empty();
+ gcedBlobCount = Optional.empty();
+ errorCount = Optional.empty();
+ bloomFilterExpectedBlobCount = Optional.empty();
+ bloomFilterAssociatedProbability = Optional.empty();
+ }
+
+ public Snapshot build() {
+ return new Snapshot(
+ referenceSourceCount.orElse(0L),
+ blobCount.orElse(0L),
+ gcedBlobCount.orElse(0L),
+ errorCount.orElse(0L),
+ bloomFilterExpectedBlobCount.orElse(0L),
+ bloomFilterAssociatedProbability.orElse(0.0));
+ }
+
+ public Builder referenceSourceCount(long referenceSourceCount) {
+ this.referenceSourceCount = Optional.of(referenceSourceCount);
+ return this;
+ }
+
+ public Builder blobCount(long blobCount) {
+ this.blobCount = Optional.of(blobCount);
+ return this;
+ }
+
+ public Builder gcedBlobCount(long gcedBlobCount) {
+ this.gcedBlobCount = Optional.of(gcedBlobCount);
+ return this;
+ }
+
+ public Builder errorCount(long errorCount) {
+ this.errorCount = Optional.of(errorCount);
+ return this;
+ }
+
+ public Builder bloomFilterExpectedBlobCount(long bloomFilterExpectedBlobCount) {
+ this.bloomFilterExpectedBlobCount = Optional.of(bloomFilterExpectedBlobCount);
+ return this;
+ }
+
+ public Builder bloomFilterAssociatedProbability(double bloomFilterAssociatedProbability) {
+ this.bloomFilterAssociatedProbability = Optional.of(bloomFilterAssociatedProbability);
+ return this;
+ }
+ }
+
+ private final long referenceSourceCount;
+ private final long blobCount;
+ private final long gcedBlobCount;
+ private final long errorCount;
+ private final long bloomFilterExpectedBlobCount;
+ private final double bloomFilterAssociatedProbability;
+
+ Snapshot(long referenceSourceCount,
+ long blobCount,
+ long gcedBlobCount,
+ long errorCount,
+ long bloomFilterExpectedBlobCount,
+ double bloomFilterAssociatedProbability) {
+ this.referenceSourceCount = referenceSourceCount;
+ this.blobCount = blobCount;
+ this.gcedBlobCount = gcedBlobCount;
+ this.errorCount = errorCount;
+ this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+ this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+ }
+
+ public long getReferenceSourceCount() {
+ return referenceSourceCount;
+ }
+
+ public long getBlobCount() {
+ return blobCount;
+ }
+
+ public long getGcedBlobCount() {
+ return gcedBlobCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public long getBloomFilterExpectedBlobCount() {
+ return bloomFilterExpectedBlobCount;
+ }
+
+ public double getBloomFilterAssociatedProbability() {
+ return bloomFilterAssociatedProbability;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof Snapshot) {
+ Snapshot that = (Snapshot) o;
+
+ return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+ && Objects.equals(this.blobCount, that.blobCount)
+ && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+ && Objects.equals(this.errorCount, that.errorCount)
+ && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+ && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("referenceSourceCount", referenceSourceCount)
+ .add("blobCount", blobCount)
+ .add("gcedBlobCount", gcedBlobCount)
+ .add("errorCount", errorCount)
+ .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+ .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+ .toString();
+ }
+ }
+
+ private final AtomicLong referenceSourceCount;
+ private final AtomicLong blobCount;
+ private final AtomicLong gcedBlobCount;
+ private final AtomicLong errorCount;
+ private final Long bloomFilterExpectedBlobCount;
+ private final Double bloomFilterAssociatedProbability;
+
+ public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+ this.referenceSourceCount = new AtomicLong();
+ this.blobCount = new AtomicLong();
+ this.gcedBlobCount = new AtomicLong();
+ this.errorCount = new AtomicLong();
+ this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+ this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+ }
+
+ public void incrementBlobCount() {
+ blobCount.incrementAndGet();
+ }
+
+ public void incrementReferenceSourceCount() {
+ referenceSourceCount.incrementAndGet();
+ }
+
+ public void incrementGCedBlobCount() {
+ gcedBlobCount.incrementAndGet();
+ }
+
+ public void incrementErrorCount() {
+ errorCount.incrementAndGet();
+ }
+
+ public Snapshot snapshot() {
+ return Snapshot.builder()
+ .referenceSourceCount(referenceSourceCount.get())
+ .blobCount(blobCount.get())
+ .gcedBlobCount(gcedBlobCount.get())
+ .errorCount(errorCount.get())
+ .bloomFilterExpectedBlobCount(bloomFilterExpectedBlobCount)
+ .bloomFilterAssociatedProbability(bloomFilterAssociatedProbability)
+ .build();
+ }
+ }
+
+ private final BlobReferenceSource referenceSource;
+ private final BlobStoreDAO blobStoreDAO;
+ private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+ private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+ private final Instant now;
+
+ // Avoids two subsequent run to have the same false positives.
+ private final String salt;
+
+ public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+ BlobStoreDAO blobStoreDAO,
+ GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+ GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+ Clock clock) {
+ this.referenceSource = referenceSource;
+ this.blobStoreDAO = blobStoreDAO;
+ this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+ this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+ this.salt = UUID.randomUUID().toString();
+ this.now = clock.instant();
+ }
+
+ public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+ return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+ .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+ }
+
+ private Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {
+ return Flux.from(blobStoreDAO.listBlobs(bucketName))
+ .doOnNext(blobId -> context.incrementBlobCount())
+ .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context))
+ .reduce(Task::combine);
+ }
+
+ private Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) {
+ return Mono.fromCallable(() -> BloomFilter.create(
+ BLOOM_FILTER_FUNNEL,
+ expectedBlobCount,
+ associatedProbability))
+ .flatMap(bloomFilter ->
+ Flux.from(referenceSource.listReferencedBlobs())
+ .doOnNext(ref -> context.incrementReferenceSourceCount())
+ .map(ref -> bloomFilter.put(salt + ref.asString()))
+ .then()
+ .thenReturn(bloomFilter));
+ }
+
+ private Mono<Result> gcBlob(BloomFilter<CharSequence> bloomFilter, BlobId blobId, BucketName bucketName, Context context) {
+ return Mono.fromCallable(() -> generationAwareBlobIdFactory.from(blobId.asString()))
+ .filter(awareBlobId -> !awareBlobId.inActiveGeneration(generationAwareBlobIdConfiguration, now))
+ .filter(expiredAwareBlobId -> !bloomFilter.mightContain(salt + blobId.asString()))
+ .flatMap(orphanBlobId ->
+ Mono.from(blobStoreDAO.delete(bucketName, orphanBlobId))
+ .then(Mono.fromCallable(() -> {
+ context.incrementGCedBlobCount();
+ return Result.COMPLETED;
+ })))
+ .onErrorResume(error -> {
+ LOGGER.error("Error when gc orphan blob ", error);
+ context.incrementErrorCount();
+ return Mono.just(Result.PARTIAL);
+ })
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
+ }
+}
diff --git a/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java
new file mode 100644
index 0000000..d08ae74
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java
@@ -0,0 +1,258 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface BloomFilterGCAlgorithmContract {
+
+ HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+ ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+ BucketName DEFAULT_BUCKET = BucketName.of("default");
+ GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+ int EXPECTED_BLOB_COUNT = 100;
+ double ASSOCIATED_PROBABILITY = 0.8;
+
+ ConditionFactory CALMLY_AWAIT = Awaitility
+ .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+ .await()
+ .atMost(TEN_SECONDS);
+
+ BlobReferenceSource BLOB_REFERENCE_SOURCE = mock(BlobReferenceSource.class);
+ UpdatableTickingClock CLOCK = new UpdatableTickingClock(NOW.toInstant());
+ GenerationAwareBlobId.Factory GENERATION_AWARE_BLOB_ID_FACTORY = new GenerationAwareBlobId.Factory(CLOCK, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+ BlobStoreDAO blobStoreDAO();
+
+ @BeforeEach
+ default void setUp() {
+ CLOCK.setInstant(NOW.toInstant());
+ }
+
+ default BlobStore blobStore() {
+ return new DeDuplicationBlobStore(blobStoreDAO(), DEFAULT_BUCKET, GENERATION_AWARE_BLOB_ID_FACTORY);
+ }
+
+ default BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+ return new BloomFilterGCAlgorithm(BLOB_REFERENCE_SOURCE,
+ blobStoreDAO(),
+ GENERATION_AWARE_BLOB_ID_FACTORY,
+ GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+ CLOCK);
+ }
+
+ @RepeatedTest(10)
+ default void gcShouldRemoveOrphanBlob() {
+ BlobStore blobStore = blobStore();
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+ CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+ Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+ BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+ Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.snapshot())
+ .isEqualTo(Snapshot.builder()
+ .referenceSourceCount(0)
+ .blobCount(1)
+ .gcedBlobCount(1)
+ .errorCount(0)
+ .bloomFilterExpectedBlobCount(100)
+ .bloomFilterAssociatedProbability(0.8)
+ .build());
+ assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+ .isInstanceOf(ObjectNotFoundException.class);
+ }
+
+ @Test
+ default void gcShouldNotRemoveUnExpireBlob() {
+ BlobStore blobStore = blobStore();
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+
+ Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+ BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+ Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.snapshot())
+ .isEqualTo(Snapshot.builder()
+ .referenceSourceCount(0)
+ .blobCount(1)
+ .gcedBlobCount(0)
+ .errorCount(0)
+ .bloomFilterExpectedBlobCount(100)
+ .bloomFilterAssociatedProbability(0.8)
+ .build());
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull();
+ }
+
+ @RepeatedTest(10)
+ default void gcShouldNotRemoveReferencedBlob() {
+ BlobStore blobStore = blobStore();
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.just(blobId));
+
+ Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+ BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+ Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ assertThat(context.snapshot())
+ .isEqualTo(Snapshot.builder()
+ .referenceSourceCount(1)
+ .blobCount(1)
+ .gcedBlobCount(0)
+ .errorCount(0)
+ .bloomFilterExpectedBlobCount(100)
+ .bloomFilterAssociatedProbability(0.8)
+ .build());
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull();
+ }
+
+ @Test
+ default void gcShouldSuccessWhenMixCase() {
+ BlobStore blobStore = blobStore();
+ List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+ List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+ CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+ Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+ BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+ Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+ assertThat(result).isEqualTo(Task.Result.COMPLETED);
+ Context.Snapshot snapshot = context.snapshot();
+
+ assertThat(snapshot.getReferenceSourceCount())
+ .isEqualTo(referencedBlobIds.size());
+ assertThat(snapshot.getBlobCount())
+ .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size());
+
+ assertThat(snapshot.getGcedBlobCount())
+ .isLessThanOrEqualTo(orphanBlobIds.size())
+ .isGreaterThan(0);
+
+ referencedBlobIds.forEach(blobId ->
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull());
+ }
+
+ @Test
+ default void allOrphanBlobIdsShouldRemovedAfterMultipleRunningTimesGC() {
+ BlobStore blobStore = blobStore();
+ List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+ List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+ CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+ CALMLY_AWAIT.untilAsserted(() -> {
+ Mono.from(bloomFilterGCAlgorithm().gc(
+ EXPECTED_BLOB_COUNT,
+ ASSOCIATED_PROBABILITY,
+ DEFAULT_BUCKET,
+ new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY)))
+ .block();
+
+ orphanBlobIds.forEach(blobId ->
+ assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+ .isInstanceOf(ObjectNotFoundException.class));
+ });
+ }
+
+ @Test
+ default void gcShouldHandlerErrorWhenException() {
+ when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+ BlobStoreDAO blobStoreDAO = mock(BlobStoreDAO.class);
+ BlobId blobId = GENERATION_AWARE_BLOB_ID_FACTORY.randomId();
+ when(blobStoreDAO.listBlobs(DEFAULT_BUCKET)).thenReturn(Flux.just(blobId));
+ when(blobStoreDAO.delete(DEFAULT_BUCKET, blobId)).thenThrow(new RuntimeException("test"));
+
+ CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+ Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+ BloomFilterGCAlgorithm bloomFilterGCAlgorithm = new BloomFilterGCAlgorithm(
+ BLOB_REFERENCE_SOURCE,
+ blobStoreDAO,
+ GENERATION_AWARE_BLOB_ID_FACTORY,
+ GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+ CLOCK);
+ Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+ assertThat(result).isEqualTo(Task.Result.PARTIAL);
+ assertThat(context.snapshot())
+ .isEqualTo(Snapshot.builder()
+ .referenceSourceCount(0)
+ .blobCount(1)
+ .gcedBlobCount(0)
+ .errorCount(1)
+ .bloomFilterExpectedBlobCount(100)
+ .bloomFilterAssociatedProbability(0.8)
+ .build());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org