You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/26 07:50:13 UTC

[james-project] 02/02: JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 59b035febe08d990e4120bb16369a2be41719bf9
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Tue Aug 24 18:04:28 2021 +0700

    JAMES-3150 BlobGCService & BloomFilterGCAlgorithm
---
 pom.xml                                            |   6 +
 server/blob/blob-memory/pom.xml                    |  11 +
 .../memory/MemoryBlobStoreGCAlgorithmTest.java     |  39 +++
 server/blob/blob-storage-strategy/pom.xml          |  17 ++
 .../server/blob/deduplication/BlobGCTask.java      | 165 +++++++++++
 .../blob/deduplication/BloomFilterGCAlgorithm.java | 304 +++++++++++++++++++++
 .../BloomFilterGCAlgorithmContract.java            | 258 +++++++++++++++++
 7 files changed, 800 insertions(+)

diff --git a/pom.xml b/pom.xml
index cce10fe..aab9136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1143,6 +1143,12 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>blob-storage-strategy</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>dead-letter-cassandra</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/server/blob/blob-memory/pom.xml b/server/blob/blob-memory/pom.xml
index 1938dae..a11713c 100644
--- a/server/blob/blob-memory/pom.xml
+++ b/server/blob/blob-memory/pom.xml
@@ -48,6 +48,17 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java
new file mode 100644
index 0000000..ebe1928
--- /dev/null
+++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryBlobStoreGCAlgorithmTest.java
@@ -0,0 +1,39 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.memory;
+
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithmContract;
+import org.junit.jupiter.api.BeforeEach;
+
+public class MemoryBlobStoreGCAlgorithmTest implements BloomFilterGCAlgorithmContract {
+
+    private BlobStoreDAO blobStoreDAO;
+
+    @BeforeEach
+    public void beforeEach() {
+        blobStoreDAO = new MemoryBlobStoreDAO();
+    }
+
+    @Override
+    public BlobStoreDAO blobStoreDAO() {
+        return blobStoreDAO;
+    }
+}
diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml
index ad1bb7a..af1daa1 100644
--- a/server/blob/blob-storage-strategy/pom.xml
+++ b/server/blob/blob-storage-strategy/pom.xml
@@ -45,6 +45,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task-api</artifactId>
+        </dependency>
+        <dependency>
             <!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 -->
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-testing</artifactId>
@@ -63,6 +67,11 @@
             <artifactId>commons-configuration2</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
         </dependency>
@@ -78,6 +87,14 @@
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <reuseForks>true</reuseForks>
+                    <forkCount>1C</forkCount>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
new file mode 100644
index 0000000..660422b
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
@@ -0,0 +1,165 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.scheduler.Schedulers;
+
+public class BlobGCTask implements Task {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCTask.class);
+    public static final TaskType TASK_TYPE = TaskType.of("BlobGCTask");
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+
+        private static AdditionalInformation from(Context context) {
+            Context.Snapshot snapshot = context.snapshot();
+            return new AdditionalInformation(
+                snapshot.getReferenceSourceCount(),
+                snapshot.getBlobCount(),
+                snapshot.getGcedBlobCount(),
+                snapshot.getErrorCount(),
+                snapshot.getBloomFilterExpectedBlobCount(),
+                snapshot.getBloomFilterAssociatedProbability());
+        }
+
+        private final Instant timestamp;
+        private final long referenceSourceCount;
+        private final long blobCount;
+        private final long gcedBlobCount;
+        private final long errorCount;
+        private final long bloomFilterExpectedBlobCount;
+        private final double bloomFilterAssociatedProbability;
+
+        AdditionalInformation(long referenceSourceCount,
+                              long blobCount,
+                              long gcedBlobCount,
+                              long errorCount,
+                              long bloomFilterExpectedBlobCount,
+                              double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = referenceSourceCount;
+            this.blobCount = blobCount;
+            this.gcedBlobCount = gcedBlobCount;
+            this.errorCount = errorCount;
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            this.timestamp = Clock.systemUTC().instant();
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+
+        public Instant getTimestamp() {
+            return timestamp;
+        }
+
+        public long getReferenceSourceCount() {
+            return referenceSourceCount;
+        }
+
+        public long getBlobCount() {
+            return blobCount;
+        }
+
+        public long getGcedBlobCount() {
+            return gcedBlobCount;
+        }
+
+        public long getErrorCount() {
+            return errorCount;
+        }
+
+        public long getBloomFilterExpectedBlobCount() {
+            return bloomFilterExpectedBlobCount;
+        }
+
+        public double getBloomFilterAssociatedProbability() {
+            return bloomFilterAssociatedProbability;
+        }
+    }
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Set<BlobReferenceSource> blobReferenceSources;
+    private final Clock clock;
+    private final BucketName bucketName;
+    private final int expectedBlobCount;
+    private final double associatedProbability;
+    private final Context context;
+
+    public BlobGCTask(BlobStoreDAO blobStoreDAO,
+                      GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                      GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                      Set<BlobReferenceSource> blobReferenceSources,
+                      BucketName bucketName,
+                      Clock clock,
+                      int expectedBlobCount,
+                      double associatedProbability) {
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.blobReferenceSources = blobReferenceSources;
+        this.clock = clock;
+        this.bucketName = bucketName;
+        this.expectedBlobCount = expectedBlobCount;
+        this.associatedProbability = associatedProbability;
+        this.context = new Context(expectedBlobCount, associatedProbability);
+    }
+
+    @Override
+    public Result run() throws InterruptedException {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            BlobReferenceAggregate.aggregate(blobReferenceSources),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
+            .subscribeOn(Schedulers.elastic())
+            .block();
+    }
+
+    @Override
+    public TaskType type() {
+        return TASK_TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(AdditionalInformation.from(context));
+    }
+}
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
new file mode 100644
index 0000000..d9fc0aa
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
@@ -0,0 +1,304 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+
+            public static Builder builder() {
+                return new Builder();
+            }
+
+            static class Builder {
+                private Optional<Long> referenceSourceCount;
+                private Optional<Long> blobCount;
+                private Optional<Long> gcedBlobCount;
+                private Optional<Long> errorCount;
+                private Optional<Long> bloomFilterExpectedBlobCount;
+                private Optional<Double> bloomFilterAssociatedProbability;
+
+                Builder() {
+                    referenceSourceCount = Optional.empty();
+                    blobCount = Optional.empty();
+                    gcedBlobCount = Optional.empty();
+                    errorCount = Optional.empty();
+                    bloomFilterExpectedBlobCount = Optional.empty();
+                    bloomFilterAssociatedProbability = Optional.empty();
+                }
+
+                public Snapshot build() {
+                    return new Snapshot(
+                        referenceSourceCount.orElse(0L),
+                        blobCount.orElse(0L),
+                        gcedBlobCount.orElse(0L),
+                        errorCount.orElse(0L),
+                        bloomFilterExpectedBlobCount.orElse(0L),
+                        bloomFilterAssociatedProbability.orElse(0.0));
+                }
+
+                public Builder referenceSourceCount(long referenceSourceCount) {
+                    this.referenceSourceCount = Optional.of(referenceSourceCount);
+                    return this;
+                }
+
+                public Builder blobCount(long blobCount) {
+                    this.blobCount = Optional.of(blobCount);
+                    return this;
+                }
+
+                public Builder gcedBlobCount(long gcedBlobCount) {
+                    this.gcedBlobCount = Optional.of(gcedBlobCount);
+                    return this;
+                }
+
+                public Builder errorCount(long errorCount) {
+                    this.errorCount = Optional.of(errorCount);
+                    return this;
+                }
+
+                public Builder bloomFilterExpectedBlobCount(long bloomFilterExpectedBlobCount) {
+                    this.bloomFilterExpectedBlobCount = Optional.of(bloomFilterExpectedBlobCount);
+                    return this;
+                }
+
+                public Builder bloomFilterAssociatedProbability(double bloomFilterAssociatedProbability) {
+                    this.bloomFilterAssociatedProbability = Optional.of(bloomFilterAssociatedProbability);
+                    return this;
+                }
+            }
+
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            Snapshot(long referenceSourceCount,
+                     long blobCount,
+                     long gcedBlobCount,
+                     long errorCount,
+                     long bloomFilterExpectedBlobCount,
+                     double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            }
+
+            public long getReferenceSourceCount() {
+                return referenceSourceCount;
+            }
+
+            public long getBlobCount() {
+                return blobCount;
+            }
+
+            public long getGcedBlobCount() {
+                return gcedBlobCount;
+            }
+
+            public long getErrorCount() {
+                return errorCount;
+            }
+
+            public long getBloomFilterExpectedBlobCount() {
+                return bloomFilterExpectedBlobCount;
+            }
+
+            public double getBloomFilterAssociatedProbability() {
+                return bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+                        && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        public Snapshot snapshot() {
+            return Snapshot.builder()
+                .referenceSourceCount(referenceSourceCount.get())
+                .blobCount(blobCount.get())
+                .gcedBlobCount(gcedBlobCount.get())
+                .errorCount(errorCount.get())
+                .bloomFilterExpectedBlobCount(bloomFilterExpectedBlobCount)
+                .bloomFilterAssociatedProbability(bloomFilterAssociatedProbability)
+                .build();
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    private Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {
+        return Flux.from(blobStoreDAO.listBlobs(bucketName))
+            .doOnNext(blobId -> context.incrementBlobCount())
+            .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context))
+            .reduce(Task::combine);
+    }
+
+    private Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) {
+        return Mono.fromCallable(() -> BloomFilter.create(
+                BLOOM_FILTER_FUNNEL,
+                expectedBlobCount,
+                associatedProbability))
+            .flatMap(bloomFilter ->
+                Flux.from(referenceSource.listReferencedBlobs())
+                    .doOnNext(ref -> context.incrementReferenceSourceCount())
+                    .map(ref -> bloomFilter.put(salt + ref.asString()))
+                    .then()
+                    .thenReturn(bloomFilter));
+    }
+
+    private Mono<Result> gcBlob(BloomFilter<CharSequence> bloomFilter, BlobId blobId, BucketName bucketName, Context context) {
+        return Mono.fromCallable(() -> generationAwareBlobIdFactory.from(blobId.asString()))
+            .filter(awareBlobId -> !awareBlobId.inActiveGeneration(generationAwareBlobIdConfiguration, now))
+            .filter(expiredAwareBlobId -> !bloomFilter.mightContain(salt + blobId.asString()))
+            .flatMap(orphanBlobId ->
+                Mono.from(blobStoreDAO.delete(bucketName, orphanBlobId))
+                    .then(Mono.fromCallable(() -> {
+                        context.incrementGCedBlobCount();
+                        return Result.COMPLETED;
+                    })))
+            .onErrorResume(error -> {
+                LOGGER.error("Error when gc orphan blob ", error);
+                context.incrementErrorCount();
+                return Mono.just(Result.PARTIAL);
+            })
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
+    }
+}
diff --git a/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java
new file mode 100644
index 0000000..d08ae74
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java
@@ -0,0 +1,258 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface BloomFilterGCAlgorithmContract {
+
+    HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    BucketName DEFAULT_BUCKET = BucketName.of("default");
+    GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+    int EXPECTED_BLOB_COUNT = 100;
+    double ASSOCIATED_PROBABILITY = 0.8;
+
+    ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    BlobReferenceSource BLOB_REFERENCE_SOURCE = mock(BlobReferenceSource.class);
+    UpdatableTickingClock CLOCK = new UpdatableTickingClock(NOW.toInstant());
+    GenerationAwareBlobId.Factory GENERATION_AWARE_BLOB_ID_FACTORY = new GenerationAwareBlobId.Factory(CLOCK, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+    BlobStoreDAO blobStoreDAO();
+
+    @BeforeEach
+    default void setUp() {
+        CLOCK.setInstant(NOW.toInstant());
+    }
+
+    default BlobStore blobStore() {
+        return new DeDuplicationBlobStore(blobStoreDAO(), DEFAULT_BUCKET, GENERATION_AWARE_BLOB_ID_FACTORY);
+    }
+
+    default BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(BLOB_REFERENCE_SOURCE,
+            blobStoreDAO(),
+            GENERATION_AWARE_BLOB_ID_FACTORY,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            CLOCK);
+    }
+
+    @RepeatedTest(10)
+    default void gcShouldRemoveOrphanBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(Snapshot.builder()
+                .referenceSourceCount(0)
+                .blobCount(1)
+                .gcedBlobCount(1)
+                .errorCount(0)
+                .bloomFilterExpectedBlobCount(100)
+                .bloomFilterAssociatedProbability(0.8)
+                .build());
+        assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void gcShouldNotRemoveUnExpireBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(Snapshot.builder()
+                .referenceSourceCount(0)
+                .blobCount(1)
+                .gcedBlobCount(0)
+                .errorCount(0)
+                .bloomFilterExpectedBlobCount(100)
+                .bloomFilterAssociatedProbability(0.8)
+                .build());
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @RepeatedTest(10)
+    default void gcShouldNotRemoveReferencedBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.just(blobId));
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(Snapshot.builder()
+                .referenceSourceCount(1)
+                .blobCount(1)
+                .gcedBlobCount(0)
+                .errorCount(0)
+                .bloomFilterExpectedBlobCount(100)
+                .bloomFilterAssociatedProbability(0.8)
+                .build());
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @Test
+    default void gcShouldSuccessWhenMixCase() {
+        BlobStore blobStore = blobStore();
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        Context.Snapshot snapshot = context.snapshot();
+
+        assertThat(snapshot.getReferenceSourceCount())
+            .isEqualTo(referencedBlobIds.size());
+        assertThat(snapshot.getBlobCount())
+            .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size());
+
+        assertThat(snapshot.getGcedBlobCount())
+            .isLessThanOrEqualTo(orphanBlobIds.size())
+            .isGreaterThan(0);
+
+        referencedBlobIds.forEach(blobId ->
+            assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+                .isNotNull());
+    }
+
+    @Test
+    default void allOrphanBlobIdsShouldRemovedAfterMultipleRunningTimesGC() {
+        BlobStore blobStore = blobStore();
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        CALMLY_AWAIT.untilAsserted(() -> {
+            Mono.from(bloomFilterGCAlgorithm().gc(
+                    EXPECTED_BLOB_COUNT,
+                    ASSOCIATED_PROBABILITY,
+                    DEFAULT_BUCKET,
+                    new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY)))
+                .block();
+
+            orphanBlobIds.forEach(blobId ->
+                assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+                    .isInstanceOf(ObjectNotFoundException.class));
+        });
+    }
+
+    @Test
+    default void gcShouldHandlerErrorWhenException() {
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+        BlobStoreDAO blobStoreDAO = mock(BlobStoreDAO.class);
+        BlobId blobId = GENERATION_AWARE_BLOB_ID_FACTORY.randomId();
+        when(blobStoreDAO.listBlobs(DEFAULT_BUCKET)).thenReturn(Flux.just(blobId));
+        when(blobStoreDAO.delete(DEFAULT_BUCKET, blobId)).thenThrow(new RuntimeException("test"));
+
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = new BloomFilterGCAlgorithm(
+            BLOB_REFERENCE_SOURCE,
+            blobStoreDAO,
+            GENERATION_AWARE_BLOB_ID_FACTORY,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            CLOCK);
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        assertThat(context.snapshot())
+            .isEqualTo(Snapshot.builder()
+                .referenceSourceCount(0)
+                .blobCount(1)
+                .gcedBlobCount(0)
+                .errorCount(1)
+                .bloomFilterExpectedBlobCount(100)
+                .bloomFilterAssociatedProbability(0.8)
+                .build());
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org