You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by GitBox <gi...@apache.org> on 2021/08/24 11:06:55 UTC

[GitHub] [james-project] vttranlina opened a new pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

vttranlina opened a new pull request #618:
URL: https://github.com/apache/james-project/pull/618


   Jira: https://issues.apache.org/jira/browse/JAMES-3150
   Ref: https://github.com/linagora/james-project/issues/4369


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694753471



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {

Review comment:
       Can I match the Task API ?

##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+
+        return gcAlgorithm.populatedBloomFilter(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, context)
+            .flatMapMany(bloomFilter ->
+                Flux.from(blobStoreDAO.listBuckets())
+                    .flatMap(bucketName -> gcAlgorithm.gc(bloomFilter, bucketName, context)))
+            .reduce(Task::combine)
+            .onErrorResume(error -> {
+                LOGGER.error("Error when running the blob garbage collection ", error);
+                return Mono.just(Result.PARTIAL);
+            });

Review comment:
       Move the context handling to the algorithm itself IMO.

##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+                        && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {

Review comment:
       private

##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+                        && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {
+        return Flux.from(blobStoreDAO.listBlobs(bucketName))
+            .doOnNext(blobId -> context.incrementBlobCount())
+            .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context))
+            .reduce(Task::combine);
+    }
+
+    public Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) {

Review comment:
       private

##########
File path: server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmTest.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithmTest {
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    private static final int EXPECTED_BLOB_COUNT = 100;
+    private static final double ASSOCIATED_PROBABILITY = 0.8;
+    private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+    private static final GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+
+    private final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    private BlobReferenceSource blobReferenceSource;
+    private Clock clock;
+    private BlobStore blobStore;
+    private BlobStoreDAO blobStoreDAO;
+    private GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+    @BeforeEach
+    void setUp() {
+        clock = new UpdatableTickingClock(NOW.toInstant());
+        generationAwareBlobIdFactory = new GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+        blobReferenceSource = mock(BlobReferenceSource.class);
+
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, generationAwareBlobIdFactory);
+    }
+
+    private BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(
+            blobReferenceSource,
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            clock);
+    }
+
+    @Test

Review comment:
       `@RepeatedTest(100)` IMO

##########
File path: server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmTest.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.time.ZonedDateTime;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithmTest {
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    private static final int EXPECTED_BLOB_COUNT = 100;
+    private static final double ASSOCIATED_PROBABILITY = 0.8;
+    private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+    private static final GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+
+    private final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    private BlobReferenceSource blobReferenceSource;
+    private Clock clock;
+    private BlobStore blobStore;
+    private BlobStoreDAO blobStoreDAO;
+    private GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+    @BeforeEach
+    void setUp() {
+        clock = new UpdatableTickingClock(NOW.toInstant());
+        generationAwareBlobIdFactory = new GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+        blobReferenceSource = mock(BlobReferenceSource.class);
+
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, generationAwareBlobIdFactory);
+    }
+
+    private BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(
+            blobReferenceSource,
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            clock);
+    }
+
+    @Test
+    void gcShouldRemoveOrphanBlobId() {

Review comment:
       We miss the test `gcShouldKeepReferencedBlobId`

##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);

Review comment:
       TODO EXPECTED_BLOB_COUNT and ASSOCIATED_PROBABILITY should not be hardcoded...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r695360868



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);

Review comment:
       No, that should be parameters at task creation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
vttranlina commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694761099



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+                        && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {
+        return Flux.from(blobStoreDAO.listBlobs(bucketName))
+            .doOnNext(blobId -> context.incrementBlobCount())
+            .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context))
+            .reduce(Task::combine);
+    }
+
+    public Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) {

Review comment:
       It should be `public`, because it is called in BlobGcService. 
   Build one-time bloomFilter for multi bucketNames




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694751486



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,

Review comment:
       Please inject a `Set<BlobReferenceSource>` to NOT have hard dependencies...

##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,

Review comment:
       What for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694752040



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,

Review comment:
       Use `BlobReferenceAggregate` BTW




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r695379970



##########
File path: server/blob/blob-storage-strategy/pom.xml
##########
@@ -44,6 +48,23 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>

Review comment:
       Yes. 
   
   Move the GC algorithm test into another module EG the blob-memory one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] Arsnael merged pull request #618: JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
Arsnael merged pull request #618:
URL: https://github.com/apache/james-project/pull/618


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
vttranlina commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r695377529



##########
File path: server/blob/blob-storage-strategy/pom.xml
##########
@@ -44,6 +48,23 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>

Review comment:
       I have a loop cycle dependency here.
   - blob-memory <= blob-storage-strategy
   - blob-storage-strategy <= blob-memory
   
   should I worry about it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r694903186



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithm.java
##########
@@ -0,0 +1,212 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Funnels;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BloomFilterGCAlgorithm {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterGCAlgorithm.class);
+    private static final Funnel<CharSequence> BLOOM_FILTER_FUNNEL = Funnels.stringFunnel(StandardCharsets.US_ASCII);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long referenceSourceCount;
+            private final long blobCount;
+            private final long gcedBlobCount;
+            private final long errorCount;
+            private final long bloomFilterExpectedBlobCount;
+            private final double bloomFilterAssociatedProbability;
+
+            public Snapshot(long referenceSourceCount,
+                            long blobCount,
+                            long gcedBlobCount,
+                            long errorCount,
+                            long bloomFilterExpectedBlobCount,
+                            double bloomFilterAssociatedProbability) {
+                this.referenceSourceCount = referenceSourceCount;
+                this.blobCount = blobCount;
+                this.gcedBlobCount = gcedBlobCount;
+                this.errorCount = errorCount;
+                this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+                this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+            }
+
+            @Override
+            public final boolean equals(Object o) {
+                if (o instanceof Snapshot) {
+                    Snapshot that = (Snapshot) o;
+
+                    return Objects.equals(this.referenceSourceCount, that.referenceSourceCount)
+                        && Objects.equals(this.blobCount, that.blobCount)
+                        && Objects.equals(this.gcedBlobCount, that.gcedBlobCount)
+                        && Objects.equals(this.errorCount, that.errorCount)
+                        && Objects.equals(this.bloomFilterExpectedBlobCount, that.bloomFilterExpectedBlobCount)
+                        && Objects.equals(this.bloomFilterAssociatedProbability, that.bloomFilterAssociatedProbability);
+                }
+                return false;
+            }
+
+            @Override
+            public final int hashCode() {
+                return Objects.hash(referenceSourceCount, blobCount, gcedBlobCount, errorCount, bloomFilterExpectedBlobCount, bloomFilterAssociatedProbability);
+            }
+
+            @Override
+            public String toString() {
+                return MoreObjects.toStringHelper(this)
+                    .add("referenceSourceCount", referenceSourceCount)
+                    .add("blobCount", blobCount)
+                    .add("gcedBlobCount", gcedBlobCount)
+                    .add("errorCount", errorCount)
+                    .add("bloomFilterExpectedBlobCount", bloomFilterExpectedBlobCount)
+                    .add("bloomFilterAssociatedProbability", bloomFilterAssociatedProbability)
+                    .toString();
+            }
+        }
+
+        private final AtomicLong referenceSourceCount;
+        private final AtomicLong blobCount;
+        private final AtomicLong gcedBlobCount;
+        private final AtomicLong errorCount;
+        private final Long bloomFilterExpectedBlobCount;
+        private final Double bloomFilterAssociatedProbability;
+
+        public Context(long bloomFilterExpectedBlobCount, double bloomFilterAssociatedProbability) {
+            this.referenceSourceCount = new AtomicLong();
+            this.blobCount = new AtomicLong();
+            this.gcedBlobCount = new AtomicLong();
+            this.errorCount = new AtomicLong();
+            this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+            this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+        }
+
+        public void incrementBlobCount() {
+            blobCount.incrementAndGet();
+        }
+
+        public void incrementReferenceSourceCount() {
+            referenceSourceCount.incrementAndGet();
+        }
+
+        public void incrementGCedBlobCount() {
+            gcedBlobCount.incrementAndGet();
+        }
+
+        public void incrementErrorCount() {
+            errorCount.incrementAndGet();
+        }
+
+        Snapshot snapshot() {
+            return new Snapshot(
+                referenceSourceCount.get(),
+                blobCount.get(),
+                gcedBlobCount.get(),
+                errorCount.get(),
+                bloomFilterExpectedBlobCount,
+                bloomFilterAssociatedProbability);
+        }
+    }
+
+    private final BlobReferenceSource referenceSource;
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final Instant now;
+
+    // Avoids two subsequent run to have the same false positives.
+    private final String salt;
+
+    public BloomFilterGCAlgorithm(BlobReferenceSource referenceSource,
+                                  BlobStoreDAO blobStoreDAO,
+                                  GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                                  GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                                  Clock clock) {
+        this.referenceSource = referenceSource;
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.salt = UUID.randomUUID().toString();
+        this.now = clock.instant();
+    }
+
+    public Mono<Result> gc(int expectedBlobCount, double associatedProbability, BucketName bucketName, Context context) {
+        return populatedBloomFilter(expectedBlobCount, associatedProbability, context)
+            .flatMap(bloomFilter -> gc(bloomFilter, bucketName, context));
+    }
+
+    public Mono<Result> gc(BloomFilter<CharSequence> bloomFilter, BucketName bucketName, Context context) {
+        return Flux.from(blobStoreDAO.listBlobs(bucketName))
+            .doOnNext(blobId -> context.incrementBlobCount())
+            .flatMap(blobId -> gcBlob(bloomFilter, blobId, bucketName, context))
+            .reduce(Task::combine);
+    }
+
+    public Mono<BloomFilter<CharSequence>> populatedBloomFilter(int expectedBlobCount, double associatedProbability, Context context) {

Review comment:
       For now we should just run the GC on the default bucket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on pull request #618: JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
vttranlina commented on pull request #618:
URL: https://github.com/apache/james-project/pull/618#issuecomment-906065803


   Squash fixup


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] Arsnael commented on pull request #618: JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
Arsnael commented on pull request #618:
URL: https://github.com/apache/james-project/pull/618#issuecomment-906054063


   Good job. Can you squash your fixups?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
vttranlina commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r695359379



##########
File path: server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCService.java
##########
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.apache.james.task.Task.Result;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.mailbox.cassandra.mail.AttachmentBlobReferenceSource;
+import org.apache.james.mailbox.cassandra.mail.MessageBlobReferenceSource;
+import org.apache.james.mailrepository.cassandra.MailRepositoryBlobReferenceSource;
+import org.apache.james.queue.rabbitmq.view.cassandra.MailQueueViewBlobReferenceSource;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.task.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobGCService {
+
+    public static final int EXPECTED_BLOB_COUNT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY = 0.8;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BlobGCService.class);
+
+    private final BlobStoreDAO blobStoreDAO;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+
+    private final MessageBlobReferenceSource messageBlobReferenceSource;
+    private final MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource;
+    private final MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource;
+    private final AttachmentBlobReferenceSource attachmentBlobReferenceSource;
+    private final Clock clock;
+
+    @Inject
+    public BlobGCService(BlobStoreDAO blobStoreDAO,
+                         GenerationAwareBlobId.Factory generationAwareBlobIdFactory,
+                         GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                         MessageBlobReferenceSource messageBlobReferenceSource,
+                         MailRepositoryBlobReferenceSource mailRepositoryBlobReferenceSource,
+                         MailQueueViewBlobReferenceSource mailQueueViewBlobReferenceSource,
+                         AttachmentBlobReferenceSource attachmentBlobReferenceSource,
+                         Clock clock) {
+
+        this.blobStoreDAO = blobStoreDAO;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.messageBlobReferenceSource = messageBlobReferenceSource;
+        this.mailRepositoryBlobReferenceSource = mailRepositoryBlobReferenceSource;
+        this.mailQueueViewBlobReferenceSource = mailQueueViewBlobReferenceSource;
+        this.attachmentBlobReferenceSource = attachmentBlobReferenceSource;
+        this.clock = clock;
+    }
+
+    public Mono<Result> gc() {
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            getBlobReferenceSource(),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);

Review comment:
       Should we read it from `blob.properties` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a change in pull request #618: [WIP] JAMES-3150 BlobGCService & BloomFilterGCAlgorithm

Posted by GitBox <gi...@apache.org>.
chibenwa commented on a change in pull request #618:
URL: https://github.com/apache/james-project/pull/618#discussion_r695618905



##########
File path: server/blob/blob-storage-strategy/src/test/java/org/apache/james/server/blob/deduplication/BloomFilterGCAlgorithmContract.java
##########
@@ -0,0 +1,230 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context;
+import org.apache.james.server.blob.deduplication.BloomFilterGCAlgorithm.Context.Snapshot;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.task.Task;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface BloomFilterGCAlgorithmContract {
+
+    HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    ZonedDateTime NOW = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    BucketName DEFAULT_BUCKET = BucketName.of("default");
+    GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+    int EXPECTED_BLOB_COUNT = 100;
+    double ASSOCIATED_PROBABILITY = 0.8;
+
+    ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    BlobReferenceSource BLOB_REFERENCE_SOURCE = mock(BlobReferenceSource.class);
+    UpdatableTickingClock CLOCK = new UpdatableTickingClock(NOW.toInstant());
+    GenerationAwareBlobId.Factory GENERATION_AWARE_BLOB_ID_FACTORY = new GenerationAwareBlobId.Factory(CLOCK, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+    BlobStoreDAO blobStoreDAO();
+
+    @BeforeEach
+    default void setUp() {
+        CLOCK.setInstant(NOW.toInstant());
+    }
+
+    default BlobStore blobStore() {
+        return new DeDuplicationBlobStore(blobStoreDAO(), DEFAULT_BUCKET, GENERATION_AWARE_BLOB_ID_FACTORY);
+    }
+
+    default BloomFilterGCAlgorithm bloomFilterGCAlgorithm() {
+        return new BloomFilterGCAlgorithm(BLOB_REFERENCE_SOURCE,
+            blobStoreDAO(),
+            GENERATION_AWARE_BLOB_ID_FACTORY,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            CLOCK);
+    }
+
+    @RepeatedTest(10)
+    default void gcShouldRemoveOrphanBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(new Snapshot(0, 1, 1, 0, 100, 0.8));
+        assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void gcShouldNotRemoveUnExpireBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(new Snapshot(0, 1, 0, 0, 100, 0.8));
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @RepeatedTest(10)
+    default void gcShouldNotRemoveReferencedBlob() {
+        BlobStore blobStore = blobStore();
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.just(blobId));
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        assertThat(context.snapshot())
+            .isEqualTo(new Snapshot(1, 1, 0, 0, 100, 0.8));
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @Test
+    default void gcShouldSuccessWhenMixCase() {
+        BlobStore blobStore = blobStore();
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = bloomFilterGCAlgorithm();
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.COMPLETED);
+        Context.Snapshot snapshot = context.snapshot();
+
+        assertThat(snapshot.getReferenceSourceCount())
+            .isEqualTo(referencedBlobIds.size());
+        assertThat(snapshot.getBlobCount())
+            .isEqualTo(referencedBlobIds.size() + orphanBlobIds.size());
+
+        assertThat(snapshot.getGcedBlobCount())
+            .isLessThanOrEqualTo(orphanBlobIds.size())
+            .isGreaterThan(0);
+
+        referencedBlobIds.forEach(blobId ->
+            assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+                .isNotNull());
+    }
+
+    @Test
+    default void allOrphanBlobIdsShouldRemovedAfterMultipleRunningTimesGC() {
+        BlobStore blobStore = blobStore();
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        CALMLY_AWAIT.untilAsserted(() -> {
+            Mono.from(bloomFilterGCAlgorithm().gc(
+                    EXPECTED_BLOB_COUNT,
+                    ASSOCIATED_PROBABILITY,
+                    DEFAULT_BUCKET,
+                    new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY)))
+                .block();
+
+            orphanBlobIds.forEach(blobId ->
+                assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+                    .isInstanceOf(ObjectNotFoundException.class));
+        });
+    }
+
+    @Test
+    default void gcShouldHandlerErrorWhenException() {
+        when(BLOB_REFERENCE_SOURCE.listReferencedBlobs()).thenReturn(Flux.empty());
+        BlobStoreDAO blobStoreDAO = mock(BlobStoreDAO.class);
+        BlobId blobId = GENERATION_AWARE_BLOB_ID_FACTORY.randomId();
+        when(blobStoreDAO.listBlobs(DEFAULT_BUCKET)).thenReturn(Flux.just(blobId));
+        when(blobStoreDAO.delete(DEFAULT_BUCKET, blobId)).thenThrow(new RuntimeException("test"));
+
+        CLOCK.setInstant(NOW.plusMonths(2).toInstant());
+
+        Context context = new Context(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY);
+        BloomFilterGCAlgorithm bloomFilterGCAlgorithm = new BloomFilterGCAlgorithm(
+            BLOB_REFERENCE_SOURCE,
+            blobStoreDAO,
+            GENERATION_AWARE_BLOB_ID_FACTORY,
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            CLOCK);
+        Task.Result result = Mono.from(bloomFilterGCAlgorithm.gc(EXPECTED_BLOB_COUNT, ASSOCIATED_PROBABILITY, DEFAULT_BUCKET, context)).block();
+
+        assertThat(result).isEqualTo(Task.Result.PARTIAL);
+        assertThat(context.snapshot())
+            .isEqualTo(new Snapshot(0, 1, 0, 1, 100, 0.8));

Review comment:
       We could have a builder for Snapshot to make this more readable :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org