You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/31 03:03:38 UTC

[james-project] 03/03: JAMES-3150 Expose the webAdmin endpoint to trigger blob deduplicated garbage collection

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 16d3a0e812b69d88d5190b5f7bf5b1392c8f90d9
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Thu Aug 26 17:46:06 2021 +0700

    JAMES-3150 Expose the webAdmin endpoint to trigger blob deduplicated garbage collection
---
 server/blob/blob-storage-strategy/pom.xml          |   4 +
 .../server/blob/deduplication/BlobGCTask.java      |  74 +---
 .../BlobGCTaskAdditionalInformationDTO.java        | 119 ++++++
 server/protocols/webadmin/webadmin-data/pom.xml    |  14 +
 .../apache/james/webadmin/routes/BlobRoutes.java   | 168 ++++++++
 .../james/webadmin/routes/BlobRoutesTest.java      | 430 +++++++++++++++++++++
 6 files changed, 753 insertions(+), 56 deletions(-)

diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml
index af1daa1..ceb5a5b 100644
--- a/server/blob/blob-storage-strategy/pom.xml
+++ b/server/blob/blob-storage-strategy/pom.xml
@@ -49,6 +49,10 @@
             <artifactId>james-server-task-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task-json</artifactId>
+        </dependency>
+        <dependency>
             <!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 -->
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-testing</artifactId>
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
index 7ecb0a4..2177c62 100644
--- a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
@@ -21,7 +21,6 @@ package org.apache.james.server.blob.deduplication;
 
 import java.time.Clock;
 import java.time.Instant;
-import java.util.Arrays;
 import java.util.Optional;
 import java.util.Set;
 
@@ -35,8 +34,6 @@ import org.apache.james.task.TaskType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 import reactor.core.scheduler.Schedulers;
 
 public class BlobGCTask implements Task {
@@ -45,7 +42,7 @@ public class BlobGCTask implements Task {
 
     public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
 
-        private static AdditionalInformation from(Scope scope, Context context) {
+        private static AdditionalInformation from(Context context) {
             Context.Snapshot snapshot = context.snapshot();
             return new AdditionalInformation(
                 snapshot.getReferenceSourceCount(),
@@ -53,8 +50,7 @@ public class BlobGCTask implements Task {
                 snapshot.getGcedBlobCount(),
                 snapshot.getErrorCount(),
                 snapshot.getBloomFilterExpectedBlobCount(),
-                snapshot.getBloomFilterAssociatedProbability(),
-                scope);
+                snapshot.getBloomFilterAssociatedProbability());
         }
 
         private final Instant timestamp;
@@ -64,15 +60,13 @@ public class BlobGCTask implements Task {
         private final long errorCount;
         private final long bloomFilterExpectedBlobCount;
         private final double bloomFilterAssociatedProbability;
-        private final Scope scope;
 
         AdditionalInformation(long referenceSourceCount,
                               long blobCount,
                               long gcedBlobCount,
                               long errorCount,
                               long bloomFilterExpectedBlobCount,
-                              double bloomFilterAssociatedProbability,
-                              Scope scope) {
+                              double bloomFilterAssociatedProbability) {
             this.referenceSourceCount = referenceSourceCount;
             this.blobCount = blobCount;
             this.gcedBlobCount = gcedBlobCount;
@@ -80,7 +74,6 @@ public class BlobGCTask implements Task {
             this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
             this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
             this.timestamp = Clock.systemUTC().instant();
-            this.scope = scope;
         }
 
         @Override
@@ -115,36 +108,13 @@ public class BlobGCTask implements Task {
         public double getBloomFilterAssociatedProbability() {
             return bloomFilterAssociatedProbability;
         }
-
-        public Scope getScope() {
-            return scope;
-        }
-    }
-
-    public enum Scope {
-        UNREFERENCED;
-
-        static class ScopeInvalidException extends IllegalArgumentException {
-        }
-
-        public static Optional<Scope> from(String name) {
-            Preconditions.checkNotNull(name);
-            return Arrays.stream(Scope.values())
-                .filter(value -> name.equalsIgnoreCase(value.name()))
-                .findFirst();
-        }
     }
 
     interface Builder {
 
         @FunctionalInterface
-        interface RequireScope {
-            BlobGCTask scope(Scope scope);
-        }
-
-        @FunctionalInterface
         interface RequireAssociatedProbability {
-            RequireScope associatedProbability(double associatedProbability);
+            BlobGCTask associatedProbability(double associatedProbability);
         }
 
         @FunctionalInterface
@@ -186,7 +156,7 @@ public class BlobGCTask implements Task {
     public static Builder.RequireBlobStoreDAO builder() {
         return blobStoreDao -> generationAwareBlobIdFactory -> generationAwareBlobIdConfiguration
             -> blobReferenceSources -> bucketName -> clock -> expectedBlobCount
-            -> associatedProbability -> scope
+            -> associatedProbability
             -> new BlobGCTask(
             blobStoreDao,
             generationAwareBlobIdFactory,
@@ -195,8 +165,7 @@ public class BlobGCTask implements Task {
             bucketName,
             clock,
             expectedBlobCount,
-            associatedProbability,
-            scope);
+            associatedProbability);
     }
 
 
@@ -209,7 +178,6 @@ public class BlobGCTask implements Task {
     private final int expectedBlobCount;
     private final double associatedProbability;
     private final Context context;
-    private final Scope scope;
 
 
     public BlobGCTask(BlobStoreDAO blobStoreDAO,
@@ -219,8 +187,7 @@ public class BlobGCTask implements Task {
                       BucketName bucketName,
                       Clock clock,
                       int expectedBlobCount,
-                      double associatedProbability,
-                      Scope scope) {
+                      double associatedProbability) {
         this.blobStoreDAO = blobStoreDAO;
         this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
         this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
@@ -230,25 +197,20 @@ public class BlobGCTask implements Task {
         this.expectedBlobCount = expectedBlobCount;
         this.associatedProbability = associatedProbability;
         this.context = new Context(expectedBlobCount, associatedProbability);
-        this.scope = scope;
     }
 
     @Override
     public Result run() throws InterruptedException {
-        if (Scope.UNREFERENCED.equals(this.scope)) {
-            BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
-                BlobReferenceAggregate.aggregate(blobReferenceSources),
-                blobStoreDAO,
-                generationAwareBlobIdFactory,
-                generationAwareBlobIdConfiguration,
-                clock);
-
-            return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
-                .subscribeOn(Schedulers.elastic())
-                .block();
-        } else {
-            return Result.COMPLETED;
-        }
+        BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+            BlobReferenceAggregate.aggregate(blobReferenceSources),
+            blobStoreDAO,
+            generationAwareBlobIdFactory,
+            generationAwareBlobIdConfiguration,
+            clock);
+
+        return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
+            .subscribeOn(Schedulers.elastic())
+            .block();
     }
 
     @Override
@@ -258,6 +220,6 @@ public class BlobGCTask implements Task {
 
     @Override
     public Optional<TaskExecutionDetails.AdditionalInformation> details() {
-        return Optional.of(AdditionalInformation.from(scope, context));
+        return Optional.of(AdditionalInformation.from(context));
     }
 }
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..f9da97a
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java
@@ -0,0 +1,119 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BlobGCTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+    public static final AdditionalInformationDTOModule<BlobGCTask.AdditionalInformation, BlobGCTaskAdditionalInformationDTO> SERIALIZATION_MODULE =
+        DTOModule.forDomainObject(BlobGCTask.AdditionalInformation.class)
+            .convertToDTO(BlobGCTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(dto ->
+                new BlobGCTask.AdditionalInformation(
+                    dto.referenceSourceCount,
+                    dto.blobCount,
+                    dto.gcedBlobCount,
+                    dto.errorCount,
+                    dto.bloomFilterExpectedBlobCount,
+                    dto.bloomFilterAssociatedProbability
+                ))
+            .toDTOConverter((domain, type) ->
+                new BlobGCTaskAdditionalInformationDTO(
+                    type,
+                    domain.getTimestamp(),
+                    domain.getReferenceSourceCount(),
+                    domain.getBlobCount(),
+                    domain.getGcedBlobCount(),
+                    domain.getErrorCount(),
+                    domain.getBloomFilterExpectedBlobCount(),
+                    domain.getBloomFilterAssociatedProbability()
+                ))
+            .typeName(BlobGCTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+
+    private final String type;
+    private final Instant timestamp;
+    private final long referenceSourceCount;
+    private final long blobCount;
+    private final long gcedBlobCount;
+    private final long errorCount;
+    private final long bloomFilterExpectedBlobCount;
+    private final double bloomFilterAssociatedProbability;
+
+    public BlobGCTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                              @JsonProperty("timestamp") Instant timestamp,
+                                              @JsonProperty("referenceSourceCount") long referenceSourceCount,
+                                              @JsonProperty("blobCount") long blobCount,
+                                              @JsonProperty("gcedBlobCount") long gcedBlobCount,
+                                              @JsonProperty("errorCount") long errorCount,
+                                              @JsonProperty("bloomFilterExpectedBlobCount") long bloomFilterExpectedBlobCount,
+                                              @JsonProperty("bloomFilterAssociatedProbability") double bloomFilterAssociatedProbability) {
+        this.type = type;
+        this.timestamp = timestamp;
+        this.referenceSourceCount = referenceSourceCount;
+        this.blobCount = blobCount;
+        this.gcedBlobCount = gcedBlobCount;
+        this.errorCount = errorCount;
+        this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+        this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+    }
+
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+
+    public long getReferenceSourceCount() {
+        return referenceSourceCount;
+    }
+
+    public long getBlobCount() {
+        return blobCount;
+    }
+
+    public long getGcedBlobCount() {
+        return gcedBlobCount;
+    }
+
+    public long getErrorCount() {
+        return errorCount;
+    }
+
+    public long getBloomFilterExpectedBlobCount() {
+        return bloomFilterExpectedBlobCount;
+    }
+
+    public double getBloomFilterAssociatedProbability() {
+        return bloomFilterAssociatedProbability;
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-data/pom.xml b/server/protocols/webadmin/webadmin-data/pom.xml
index d9ce9c9..2aee4cb 100644
--- a/server/protocols/webadmin/webadmin-data/pom.xml
+++ b/server/protocols/webadmin/webadmin-data/pom.xml
@@ -34,6 +34,15 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>event-sourcing-event-store-memory</artifactId>
             <scope>test</scope>
         </dependency>
@@ -67,6 +76,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-task-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-testing</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java
new file mode 100644
index 0000000..346ca39
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java
@@ -0,0 +1,168 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import java.time.Clock;
+import java.util.Optional;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.server.blob.deduplication.BlobGCTask;
+import org.apache.james.server.blob.deduplication.GenerationAwareBlobId;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.tasks.TaskIdDto;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+
+import com.google.common.base.Preconditions;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import spark.Request;
+import spark.Service;
+
+@Api(tags = "Blobs")
+@Path("/blobs")
+@Produces("application/json")
+public class BlobRoutes implements Routes {
+
+    public static final String BASE_PATH = "/blobs";
+    public static final int EXPECTED_BLOB_COUNT_DEFAULT = 1_000_000;
+    public static final double ASSOCIATED_PROBABILITY_DEFAULT = 0.8;
+
+    private final TaskManager taskManager;
+    private final JsonTransformer jsonTransformer;
+    private final Clock clock;
+    private final BlobStoreDAO blobStoreDAO;
+    private final BucketName bucketName;
+    private final Set<BlobReferenceSource> blobReferenceSources;
+    private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+    private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+    @Inject
+    public BlobRoutes(TaskManager taskManager,
+                      JsonTransformer jsonTransformer,
+                      Clock clock,
+                      BlobStoreDAO blobStoreDAO,
+                      @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) BucketName defaultBucketName,
+                      Set<BlobReferenceSource> blobReferenceSources,
+                      GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+                      GenerationAwareBlobId.Factory generationAwareBlobIdFactory) {
+        this.taskManager = taskManager;
+        this.jsonTransformer = jsonTransformer;
+        this.clock = clock;
+        this.blobStoreDAO = blobStoreDAO;
+        this.bucketName = defaultBucketName;
+        this.blobReferenceSources = blobReferenceSources;
+        this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+        this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+    }
+
+    @Override
+    public String getBasePath() {
+        return BASE_PATH;
+    }
+
+    @Override
+    public void define(Service service) {
+        TaskFromRequest gcUnreferencedTaskRequest = this::gcUnreferenced;
+        service.delete(BASE_PATH, gcUnreferencedTaskRequest.asRoute(taskManager), jsonTransformer);
+    }
+
+    @DELETE
+    @Path("/blobs")
+    @ApiOperation(value = "Create a task to run blob deduplicate garbage collection", nickname = "BlobGC")
+    @ApiImplicitParams({
+        @ApiImplicitParam(required = true, dataType = "string", name = "scope", paramType = "query", example = "scope=unreferenced"),
+        @ApiImplicitParam(required = false, dataType = "double", name = "associatedProbability", paramType = "query",
+            defaultValue = "1_000_000", example = "associatedProbability=1000"),
+        @ApiImplicitParam(required = false, dataType = "integer", name = "expectedBlobCount", paramType = "query",
+            defaultValue = "0.8", example = "expectedBlobCount=0.7")
+    })
+    @ApiResponses(
+        {
+            @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class),
+            @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid arguments supplied in the user request"),
+            @ApiResponse(code = HttpStatus.UNAUTHORIZED_401, message = "Unauthorized. The user is not authenticated on the platform"),
+        })
+    public Task gcUnreferenced(Request request) {
+        Preconditions.checkArgument(Optional.ofNullable(request.queryParams("scope"))
+            .filter("unreferenced"::equals)
+            .isPresent(),
+            "'scope' is missing or must be 'unreferenced'");
+
+        int expectedBlobCount = getExpectedBlobCount(request).orElse(EXPECTED_BLOB_COUNT_DEFAULT);
+        double associatedProbability = getAssociatedProbability(request).orElse(ASSOCIATED_PROBABILITY_DEFAULT);
+
+        return BlobGCTask.builder()
+            .blobStoreDAO(blobStoreDAO)
+            .generationAwareBlobIdFactory(generationAwareBlobIdFactory)
+            .generationAwareBlobIdConfiguration(generationAwareBlobIdConfiguration)
+            .blobReferenceSource(blobReferenceSources)
+            .bucketName(bucketName)
+            .clock(clock)
+            .expectedBlobCount(expectedBlobCount)
+            .associatedProbability(associatedProbability);
+    }
+
+    private static Optional<Integer> getExpectedBlobCount(Request req) {
+        try {
+            return Optional.ofNullable(req.queryParams("expectedBlobCount"))
+                .map(Integer::parseInt)
+                .map(expectedBlobCount -> {
+                    Preconditions.checkArgument(expectedBlobCount > 0,
+                        "'expectedBlobCount' must be strictly positive");
+                    return expectedBlobCount;
+                });
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("'expectedBlobCount' must be numeric");
+        }
+    }
+
+    private static Optional<Double> getAssociatedProbability(Request req) {
+        try {
+            return Optional.ofNullable(req.queryParams("associatedProbability"))
+                .map(Double::parseDouble)
+                .map(associatedProbability -> {
+                    Preconditions.checkArgument(associatedProbability > 0 && associatedProbability < 1,
+                        "'associatedProbability' must be greater than 0.0 and smaller than 1.0");
+                    return associatedProbability;
+                });
+        } catch (NumberFormatException ex) {
+            throw new IllegalArgumentException("'associatedProbability' must be numeric");
+        }
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java
new file mode 100644
index 0000000..5b64236
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java
@@ -0,0 +1,430 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.http.ContentType.JSON;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.server.blob.deduplication.BlobGCTaskAdditionalInformationDTO;
+import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
+import org.apache.james.server.blob.deduplication.GenerationAwareBlobId;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.eclipse.jetty.http.HttpStatus;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.collect.ImmutableSet;
+
+import io.restassured.RestAssured;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobRoutesTest {
+    private static final String BASE_PATH = "/blobs";
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+    private static final ZonedDateTime TIMESTAMP = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+    private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+    private static final GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+    private static final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    private WebAdminServer webAdminServer;
+    private MemoryTaskManager taskManager;
+    private UpdatableTickingClock clock;
+    private BlobReferenceSource blobReferenceSource;
+    private BlobStore blobStore;
+
+    @BeforeEach
+    void setUp() {
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
+        clock = new UpdatableTickingClock(TIMESTAMP.toInstant());
+        blobReferenceSource = mock(BlobReferenceSource.class);
+        when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.empty());
+        GenerationAwareBlobId.Factory generationAwareBlobIdFactory = new GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+        BlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO();
+        blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, generationAwareBlobIdFactory);
+        JsonTransformer jsonTransformer = new JsonTransformer();
+        TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(BlobGCTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
+        BlobRoutes blobRoutes = new BlobRoutes(
+            taskManager,
+            jsonTransformer,
+            clock,
+            blobStoreDAO,
+            DEFAULT_BUCKET,
+            ImmutableSet.of(blobReferenceSource),
+            GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+            generationAwareBlobIdFactory);
+
+        webAdminServer = WebAdminUtils.createWebAdminServer(blobRoutes, tasksRoutes).start();
+
+        RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer)
+            .setBasePath(BASE_PATH)
+            .build();
+    }
+
+    @AfterEach
+    void stop() {
+        webAdminServer.destroy();
+        taskManager.stop();
+    }
+
+    @Test
+    void deleteUnReferencedShouldReturnErrorWhenScopeInvalid() {
+        given()
+            .queryParam("scope", "invalid")
+            .delete()
+        .then()
+            .statusCode(BAD_REQUEST_400)
+            .contentType(JSON)
+            .body("statusCode", is(BAD_REQUEST_400))
+            .body("type", is("InvalidArgument"))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", is("'scope' is missing or must be 'unreferenced'"));
+    }
+
+    @Test
+    void deleteUnReferencedShouldReturnErrorWhenMissingScope() {
+        given()
+            .delete()
+        .then()
+            .statusCode(BAD_REQUEST_400)
+            .contentType(JSON)
+            .body("statusCode", is(BAD_REQUEST_400))
+            .body("type", is("InvalidArgument"))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", is("'scope' is missing or must be 'unreferenced'"));
+    }
+
+    @Test
+    void deleteUnReferencedShouldReturnTaskId() {
+        given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+        .then()
+            .statusCode(HttpStatus.CREATED_201)
+            .body("taskId", notNullValue());
+    }
+
+    @Test
+    void gcTaskShouldReturnDetail() {
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("taskId", is(notNullValue()))
+            .body("type", is("BlobGCTask"))
+            .body("startedDate", is(notNullValue()))
+            .body("submitDate", is(notNullValue()))
+            .body("completedDate", is(notNullValue()))
+            .body("additionalInformation.type", is("BlobGCTask"))
+            .body("additionalInformation.timestamp", is(notNullValue()))
+            .body("additionalInformation.referenceSourceCount", is(0))
+            .body("additionalInformation.blobCount", is(0))
+            .body("additionalInformation.gcedBlobCount", is(0))
+            .body("additionalInformation.errorCount", is(0))
+            .body("additionalInformation.bloomFilterExpectedBlobCount", is(1_000_000))
+            .body("additionalInformation.bloomFilterAssociatedProbability", is(0.8F));
+    }
+
+    @Test
+    void deleteUnReferencedShouldAcceptBloomFilterExpectedBlobCountParam() {
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .queryParam("expectedBlobCount", 99)
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("additionalInformation.bloomFilterExpectedBlobCount", is(99));
+    }
+
+    @ParameterizedTest
+    @MethodSource("expectedBlobCountParameters")
+    void deleteUnReferencedShouldReturnErrorWhenExpectedBlobCountInvalid(Object expectedBlobCount) {
+        given()
+            .queryParam("scope", "unreferenced")
+            .queryParam("expectedBlobCount", expectedBlobCount)
+            .delete()
+        .then()
+            .statusCode(BAD_REQUEST_400)
+            .contentType(JSON)
+            .body("statusCode", is(BAD_REQUEST_400))
+            .body("type", is("InvalidArgument"))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", containsString("expectedBlobCount"));
+    }
+
+    private static Stream<Arguments> expectedBlobCountParameters() {
+        return Stream.of(
+            Arguments.of(-1),
+            Arguments.of(0),
+            Arguments.of("invalid")
+        );
+    }
+
+    @Test
+    void deleteUnReferencedShouldAcceptBloomFilterAssociatedProbabilityParam() {
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .queryParam("associatedProbability", 0.2)
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("additionalInformation.bloomFilterAssociatedProbability", is(0.2F));
+    }
+
+    @ParameterizedTest
+    @MethodSource("associatedProbabilityParameters")
+    void deleteUnReferencedShouldReturnErrorWhenAssociatedProbabilityInvalid(Object associatedProbability) {
+        given()
+            .queryParam("scope", "unreferenced")
+            .queryParam("associatedProbability", associatedProbability)
+            .delete()
+        .then()
+            .statusCode(BAD_REQUEST_400)
+            .contentType(JSON)
+            .body("statusCode", is(BAD_REQUEST_400))
+            .body("type", is("InvalidArgument"))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", containsString("associatedProbability"));
+    }
+
+    private static Stream<Arguments> associatedProbabilityParameters() {
+        return Stream.of(
+            Arguments.of(-1),
+            Arguments.of(-0.1F),
+            Arguments.of(1.1),
+            Arguments.of(1),
+            Arguments.of(Integer.MAX_VALUE),
+            Arguments.of("invalid"),
+            Arguments.of("")
+        );
+    }
+
+    @Test
+    void gcTaskShouldRemoveOrphanBlob() {
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("additionalInformation.referenceSourceCount", is(0))
+            .body("additionalInformation.blobCount", is(1))
+            .body("additionalInformation.gcedBlobCount", is(1))
+            .body("additionalInformation.errorCount", is(0));
+
+        assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    void gcTaskShouldNotRemoveUnExpireBlob() {
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("additionalInformation.referenceSourceCount", is(0))
+            .body("additionalInformation.blobCount", is(1))
+            .body("additionalInformation.gcedBlobCount", is(0))
+            .body("additionalInformation.errorCount", is(0));
+
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @Test
+    void gcTaskShouldNotRemoveReferencedBlob() {
+        BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+        when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.just(blobId));
+
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("additionalInformation.referenceSourceCount", is(1))
+            .body("additionalInformation.blobCount", is(1))
+            .body("additionalInformation.gcedBlobCount", is(0))
+            .body("additionalInformation.errorCount", is(0));
+
+        assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+            .isNotNull();
+    }
+
+    @Test
+    void gcTaskShouldSuccessWhenMixCase() {
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+        List<BlobId> unExpiredBlobIds = IntStream.range(0, 30)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        String taskId = given()
+            .queryParam("scope", "unreferenced")
+            .delete()
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("additionalInformation.referenceSourceCount", is(referencedBlobIds.size()))
+            .body("additionalInformation.blobCount", is(referencedBlobIds.size() + orphanBlobIds.size() + unExpiredBlobIds.size()))
+            .body("additionalInformation.gcedBlobCount", Matchers.lessThanOrEqualTo(orphanBlobIds.size()))
+            .body("additionalInformation.errorCount", is(0));
+
+        referencedBlobIds.forEach(blobId ->
+            assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+                .isNotNull());
+
+        unExpiredBlobIds.forEach(blobId ->
+            assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+                .isNotNull());
+    }
+
+    @Test
+    void allOrphanBlobIdsShouldRemovedAfterMultipleCallDeleteUnreferenced() {
+        List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+        List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+            .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+            .collect(Collectors.toList());
+
+        when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+        clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+        CALMLY_AWAIT.untilAsserted(() -> {
+            String taskId = given()
+                .queryParam("scope", "unreferenced")
+                .delete()
+                .jsonPath()
+                .get("taskId");
+
+            given()
+                .basePath(TasksRoutes.BASE)
+            .when()
+                .get(taskId + "/await");
+
+            orphanBlobIds.forEach(blobId ->
+                assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+                    .isInstanceOf(ObjectNotFoundException.class));
+        });
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org