You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/08/31 03:03:38 UTC
[james-project] 03/03: JAMES-3150 Expose the webAdmin endpoint to
trigger blob deduplicated garbage collection
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 16d3a0e812b69d88d5190b5f7bf5b1392c8f90d9
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Thu Aug 26 17:46:06 2021 +0700
JAMES-3150 Expose the webAdmin endpoint to trigger blob deduplicated garbage collection
---
server/blob/blob-storage-strategy/pom.xml | 4 +
.../server/blob/deduplication/BlobGCTask.java | 74 +---
.../BlobGCTaskAdditionalInformationDTO.java | 119 ++++++
server/protocols/webadmin/webadmin-data/pom.xml | 14 +
.../apache/james/webadmin/routes/BlobRoutes.java | 168 ++++++++
.../james/webadmin/routes/BlobRoutesTest.java | 430 +++++++++++++++++++++
6 files changed, 753 insertions(+), 56 deletions(-)
diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml
index af1daa1..ceb5a5b 100644
--- a/server/blob/blob-storage-strategy/pom.xml
+++ b/server/blob/blob-storage-strategy/pom.xml
@@ -49,6 +49,10 @@
<artifactId>james-server-task-api</artifactId>
</dependency>
<dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-task-json</artifactId>
+ </dependency>
+ <dependency>
<!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 -->
<groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
index 7ecb0a4..2177c62 100644
--- a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java
@@ -21,7 +21,6 @@ package org.apache.james.server.blob.deduplication;
import java.time.Clock;
import java.time.Instant;
-import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
@@ -35,8 +34,6 @@ import org.apache.james.task.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
import reactor.core.scheduler.Schedulers;
public class BlobGCTask implements Task {
@@ -45,7 +42,7 @@ public class BlobGCTask implements Task {
public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
- private static AdditionalInformation from(Scope scope, Context context) {
+ private static AdditionalInformation from(Context context) {
Context.Snapshot snapshot = context.snapshot();
return new AdditionalInformation(
snapshot.getReferenceSourceCount(),
@@ -53,8 +50,7 @@ public class BlobGCTask implements Task {
snapshot.getGcedBlobCount(),
snapshot.getErrorCount(),
snapshot.getBloomFilterExpectedBlobCount(),
- snapshot.getBloomFilterAssociatedProbability(),
- scope);
+ snapshot.getBloomFilterAssociatedProbability());
}
private final Instant timestamp;
@@ -64,15 +60,13 @@ public class BlobGCTask implements Task {
private final long errorCount;
private final long bloomFilterExpectedBlobCount;
private final double bloomFilterAssociatedProbability;
- private final Scope scope;
AdditionalInformation(long referenceSourceCount,
long blobCount,
long gcedBlobCount,
long errorCount,
long bloomFilterExpectedBlobCount,
- double bloomFilterAssociatedProbability,
- Scope scope) {
+ double bloomFilterAssociatedProbability) {
this.referenceSourceCount = referenceSourceCount;
this.blobCount = blobCount;
this.gcedBlobCount = gcedBlobCount;
@@ -80,7 +74,6 @@ public class BlobGCTask implements Task {
this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
this.timestamp = Clock.systemUTC().instant();
- this.scope = scope;
}
@Override
@@ -115,36 +108,13 @@ public class BlobGCTask implements Task {
public double getBloomFilterAssociatedProbability() {
return bloomFilterAssociatedProbability;
}
-
- public Scope getScope() {
- return scope;
- }
- }
-
- public enum Scope {
- UNREFERENCED;
-
- static class ScopeInvalidException extends IllegalArgumentException {
- }
-
- public static Optional<Scope> from(String name) {
- Preconditions.checkNotNull(name);
- return Arrays.stream(Scope.values())
- .filter(value -> name.equalsIgnoreCase(value.name()))
- .findFirst();
- }
}
interface Builder {
@FunctionalInterface
- interface RequireScope {
- BlobGCTask scope(Scope scope);
- }
-
- @FunctionalInterface
interface RequireAssociatedProbability {
- RequireScope associatedProbability(double associatedProbability);
+ BlobGCTask associatedProbability(double associatedProbability);
}
@FunctionalInterface
@@ -186,7 +156,7 @@ public class BlobGCTask implements Task {
public static Builder.RequireBlobStoreDAO builder() {
return blobStoreDao -> generationAwareBlobIdFactory -> generationAwareBlobIdConfiguration
-> blobReferenceSources -> bucketName -> clock -> expectedBlobCount
- -> associatedProbability -> scope
+ -> associatedProbability
-> new BlobGCTask(
blobStoreDao,
generationAwareBlobIdFactory,
@@ -195,8 +165,7 @@ public class BlobGCTask implements Task {
bucketName,
clock,
expectedBlobCount,
- associatedProbability,
- scope);
+ associatedProbability);
}
@@ -209,7 +178,6 @@ public class BlobGCTask implements Task {
private final int expectedBlobCount;
private final double associatedProbability;
private final Context context;
- private final Scope scope;
public BlobGCTask(BlobStoreDAO blobStoreDAO,
@@ -219,8 +187,7 @@ public class BlobGCTask implements Task {
BucketName bucketName,
Clock clock,
int expectedBlobCount,
- double associatedProbability,
- Scope scope) {
+ double associatedProbability) {
this.blobStoreDAO = blobStoreDAO;
this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
@@ -230,25 +197,20 @@ public class BlobGCTask implements Task {
this.expectedBlobCount = expectedBlobCount;
this.associatedProbability = associatedProbability;
this.context = new Context(expectedBlobCount, associatedProbability);
- this.scope = scope;
}
@Override
public Result run() throws InterruptedException {
- if (Scope.UNREFERENCED.equals(this.scope)) {
- BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
- BlobReferenceAggregate.aggregate(blobReferenceSources),
- blobStoreDAO,
- generationAwareBlobIdFactory,
- generationAwareBlobIdConfiguration,
- clock);
-
- return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
- .subscribeOn(Schedulers.elastic())
- .block();
- } else {
- return Result.COMPLETED;
- }
+ BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm(
+ BlobReferenceAggregate.aggregate(blobReferenceSources),
+ blobStoreDAO,
+ generationAwareBlobIdFactory,
+ generationAwareBlobIdConfiguration,
+ clock);
+
+ return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context)
+ .subscribeOn(Schedulers.elastic())
+ .block();
}
@Override
@@ -258,6 +220,6 @@ public class BlobGCTask implements Task {
@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
- return Optional.of(AdditionalInformation.from(scope, context));
+ return Optional.of(AdditionalInformation.from(context));
}
}
diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000..f9da97a
--- /dev/null
+++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java
@@ -0,0 +1,119 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class BlobGCTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+ public static final AdditionalInformationDTOModule<BlobGCTask.AdditionalInformation, BlobGCTaskAdditionalInformationDTO> SERIALIZATION_MODULE =
+ DTOModule.forDomainObject(BlobGCTask.AdditionalInformation.class)
+ .convertToDTO(BlobGCTaskAdditionalInformationDTO.class)
+ .toDomainObjectConverter(dto ->
+ new BlobGCTask.AdditionalInformation(
+ dto.referenceSourceCount,
+ dto.blobCount,
+ dto.gcedBlobCount,
+ dto.errorCount,
+ dto.bloomFilterExpectedBlobCount,
+ dto.bloomFilterAssociatedProbability
+ ))
+ .toDTOConverter((domain, type) ->
+ new BlobGCTaskAdditionalInformationDTO(
+ type,
+ domain.getTimestamp(),
+ domain.getReferenceSourceCount(),
+ domain.getBlobCount(),
+ domain.getGcedBlobCount(),
+ domain.getErrorCount(),
+ domain.getBloomFilterExpectedBlobCount(),
+ domain.getBloomFilterAssociatedProbability()
+ ))
+ .typeName(BlobGCTask.TASK_TYPE.asString())
+ .withFactory(AdditionalInformationDTOModule::new);
+
+ private final String type;
+ private final Instant timestamp;
+ private final long referenceSourceCount;
+ private final long blobCount;
+ private final long gcedBlobCount;
+ private final long errorCount;
+ private final long bloomFilterExpectedBlobCount;
+ private final double bloomFilterAssociatedProbability;
+
+ public BlobGCTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+ @JsonProperty("timestamp") Instant timestamp,
+ @JsonProperty("referenceSourceCount") long referenceSourceCount,
+ @JsonProperty("blobCount") long blobCount,
+ @JsonProperty("gcedBlobCount") long gcedBlobCount,
+ @JsonProperty("errorCount") long errorCount,
+ @JsonProperty("bloomFilterExpectedBlobCount") long bloomFilterExpectedBlobCount,
+ @JsonProperty("bloomFilterAssociatedProbability") double bloomFilterAssociatedProbability) {
+ this.type = type;
+ this.timestamp = timestamp;
+ this.referenceSourceCount = referenceSourceCount;
+ this.blobCount = blobCount;
+ this.gcedBlobCount = gcedBlobCount;
+ this.errorCount = errorCount;
+ this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount;
+ this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability;
+ }
+
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public long getReferenceSourceCount() {
+ return referenceSourceCount;
+ }
+
+ public long getBlobCount() {
+ return blobCount;
+ }
+
+ public long getGcedBlobCount() {
+ return gcedBlobCount;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+
+ public long getBloomFilterExpectedBlobCount() {
+ return bloomFilterExpectedBlobCount;
+ }
+
+ public double getBloomFilterAssociatedProbability() {
+ return bloomFilterAssociatedProbability;
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-data/pom.xml b/server/protocols/webadmin/webadmin-data/pom.xml
index d9ce9c9..2aee4cb 100644
--- a/server/protocols/webadmin/webadmin-data/pom.xml
+++ b/server/protocols/webadmin/webadmin-data/pom.xml
@@ -34,6 +34,15 @@
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>blob-memory</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-storage-strategy</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-memory</artifactId>
<scope>test</scope>
</dependency>
@@ -67,6 +76,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>james-server-task-memory</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
<scope>test</scope>
</dependency>
diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java
new file mode 100644
index 0000000..346ca39
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java
@@ -0,0 +1,168 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import java.time.Clock;
+import java.util.Optional;
+import java.util.Set;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.server.blob.deduplication.BlobGCTask;
+import org.apache.james.server.blob.deduplication.GenerationAwareBlobId;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskManager;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.tasks.TaskFromRequest;
+import org.apache.james.webadmin.tasks.TaskIdDto;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+
+import com.google.common.base.Preconditions;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import spark.Request;
+import spark.Service;
+
+@Api(tags = "Blobs")
+@Path("/blobs")
+@Produces("application/json")
+public class BlobRoutes implements Routes {
+
+ public static final String BASE_PATH = "/blobs";
+ public static final int EXPECTED_BLOB_COUNT_DEFAULT = 1_000_000;
+ public static final double ASSOCIATED_PROBABILITY_DEFAULT = 0.8;
+
+ private final TaskManager taskManager;
+ private final JsonTransformer jsonTransformer;
+ private final Clock clock;
+ private final BlobStoreDAO blobStoreDAO;
+ private final BucketName bucketName;
+ private final Set<BlobReferenceSource> blobReferenceSources;
+ private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration;
+ private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory;
+
+ @Inject
+ public BlobRoutes(TaskManager taskManager,
+ JsonTransformer jsonTransformer,
+ Clock clock,
+ BlobStoreDAO blobStoreDAO,
+ @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) BucketName defaultBucketName,
+ Set<BlobReferenceSource> blobReferenceSources,
+ GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration,
+ GenerationAwareBlobId.Factory generationAwareBlobIdFactory) {
+ this.taskManager = taskManager;
+ this.jsonTransformer = jsonTransformer;
+ this.clock = clock;
+ this.blobStoreDAO = blobStoreDAO;
+ this.bucketName = defaultBucketName;
+ this.blobReferenceSources = blobReferenceSources;
+ this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration;
+ this.generationAwareBlobIdFactory = generationAwareBlobIdFactory;
+ }
+
+ @Override
+ public String getBasePath() {
+ return BASE_PATH;
+ }
+
+ @Override
+ public void define(Service service) {
+ TaskFromRequest gcUnreferencedTaskRequest = this::gcUnreferenced;
+ service.delete(BASE_PATH, gcUnreferencedTaskRequest.asRoute(taskManager), jsonTransformer);
+ }
+
+ @DELETE
+ @Path("/blobs")
+ @ApiOperation(value = "Create a task to run blob deduplicate garbage collection", nickname = "BlobGC")
+ @ApiImplicitParams({
+ @ApiImplicitParam(required = true, dataType = "string", name = "scope", paramType = "query", example = "scope=unreferenced"),
+ @ApiImplicitParam(required = false, dataType = "double", name = "associatedProbability", paramType = "query",
+ defaultValue = "1_000_000", example = "associatedProbability=1000"),
+ @ApiImplicitParam(required = false, dataType = "integer", name = "expectedBlobCount", paramType = "query",
+ defaultValue = "0.8", example = "expectedBlobCount=0.7")
+ })
+ @ApiResponses(
+ {
+ @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class),
+ @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid arguments supplied in the user request"),
+ @ApiResponse(code = HttpStatus.UNAUTHORIZED_401, message = "Unauthorized. The user is not authenticated on the platform"),
+ })
+ public Task gcUnreferenced(Request request) {
+ Preconditions.checkArgument(Optional.ofNullable(request.queryParams("scope"))
+ .filter("unreferenced"::equals)
+ .isPresent(),
+ "'scope' is missing or must be 'unreferenced'");
+
+ int expectedBlobCount = getExpectedBlobCount(request).orElse(EXPECTED_BLOB_COUNT_DEFAULT);
+ double associatedProbability = getAssociatedProbability(request).orElse(ASSOCIATED_PROBABILITY_DEFAULT);
+
+ return BlobGCTask.builder()
+ .blobStoreDAO(blobStoreDAO)
+ .generationAwareBlobIdFactory(generationAwareBlobIdFactory)
+ .generationAwareBlobIdConfiguration(generationAwareBlobIdConfiguration)
+ .blobReferenceSource(blobReferenceSources)
+ .bucketName(bucketName)
+ .clock(clock)
+ .expectedBlobCount(expectedBlobCount)
+ .associatedProbability(associatedProbability);
+ }
+
+ private static Optional<Integer> getExpectedBlobCount(Request req) {
+ try {
+ return Optional.ofNullable(req.queryParams("expectedBlobCount"))
+ .map(Integer::parseInt)
+ .map(expectedBlobCount -> {
+ Preconditions.checkArgument(expectedBlobCount > 0,
+ "'expectedBlobCount' must be strictly positive");
+ return expectedBlobCount;
+ });
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("'expectedBlobCount' must be numeric");
+ }
+ }
+
+ private static Optional<Double> getAssociatedProbability(Request req) {
+ try {
+ return Optional.ofNullable(req.queryParams("associatedProbability"))
+ .map(Double::parseDouble)
+ .map(associatedProbability -> {
+ Preconditions.checkArgument(associatedProbability > 0 && associatedProbability < 1,
+ "'associatedProbability' must be greater than 0.0 and smaller than 1.0");
+ return associatedProbability;
+ });
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException("'associatedProbability' must be numeric");
+ }
+ }
+}
diff --git a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java
new file mode 100644
index 0000000..5b64236
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java
@@ -0,0 +1,430 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.http.ContentType.JSON;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.server.blob.deduplication.BlobGCTaskAdditionalInformationDTO;
+import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
+import org.apache.james.server.blob.deduplication.GenerationAwareBlobId;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+import org.eclipse.jetty.http.HttpStatus;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.collect.ImmutableSet;
+
+import io.restassured.RestAssured;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class BlobRoutesTest {
+ private static final String BASE_PATH = "/blobs";
+ private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
+ private static final ZonedDateTime TIMESTAMP = ZonedDateTime.parse("2015-10-30T16:12:00Z");
+ private static final BucketName DEFAULT_BUCKET = BucketName.of("default");
+ private static final GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT;
+ private static final ConditionFactory CALMLY_AWAIT = Awaitility
+ .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+ .await()
+ .atMost(TEN_SECONDS);
+
+ private WebAdminServer webAdminServer;
+ private MemoryTaskManager taskManager;
+ private UpdatableTickingClock clock;
+ private BlobReferenceSource blobReferenceSource;
+ private BlobStore blobStore;
+
+ @BeforeEach
+ void setUp() {
+ taskManager = new MemoryTaskManager(new Hostname("foo"));
+ clock = new UpdatableTickingClock(TIMESTAMP.toInstant());
+ blobReferenceSource = mock(BlobReferenceSource.class);
+ when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.empty());
+ GenerationAwareBlobId.Factory generationAwareBlobIdFactory = new GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION);
+
+ BlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO();
+ blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, generationAwareBlobIdFactory);
+ JsonTransformer jsonTransformer = new JsonTransformer();
+ TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(BlobGCTaskAdditionalInformationDTO.SERIALIZATION_MODULE));
+ BlobRoutes blobRoutes = new BlobRoutes(
+ taskManager,
+ jsonTransformer,
+ clock,
+ blobStoreDAO,
+ DEFAULT_BUCKET,
+ ImmutableSet.of(blobReferenceSource),
+ GENERATION_AWARE_BLOB_ID_CONFIGURATION,
+ generationAwareBlobIdFactory);
+
+ webAdminServer = WebAdminUtils.createWebAdminServer(blobRoutes, tasksRoutes).start();
+
+ RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer)
+ .setBasePath(BASE_PATH)
+ .build();
+ }
+
+ @AfterEach
+ void stop() {
+ webAdminServer.destroy();
+ taskManager.stop();
+ }
+
+ @Test
+ void deleteUnReferencedShouldReturnErrorWhenScopeInvalid() {
+ given()
+ .queryParam("scope", "invalid")
+ .delete()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'scope' is missing or must be 'unreferenced'"));
+ }
+
+ @Test
+ void deleteUnReferencedShouldReturnErrorWhenMissingScope() {
+ given()
+ .delete()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", is("'scope' is missing or must be 'unreferenced'"));
+ }
+
+ @Test
+ void deleteUnReferencedShouldReturnTaskId() {
+ given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .then()
+ .statusCode(HttpStatus.CREATED_201)
+ .body("taskId", notNullValue());
+ }
+
+ @Test
+ void gcTaskShouldReturnDetail() {
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("taskId", is(notNullValue()))
+ .body("type", is("BlobGCTask"))
+ .body("startedDate", is(notNullValue()))
+ .body("submitDate", is(notNullValue()))
+ .body("completedDate", is(notNullValue()))
+ .body("additionalInformation.type", is("BlobGCTask"))
+ .body("additionalInformation.timestamp", is(notNullValue()))
+ .body("additionalInformation.referenceSourceCount", is(0))
+ .body("additionalInformation.blobCount", is(0))
+ .body("additionalInformation.gcedBlobCount", is(0))
+ .body("additionalInformation.errorCount", is(0))
+ .body("additionalInformation.bloomFilterExpectedBlobCount", is(1_000_000))
+ .body("additionalInformation.bloomFilterAssociatedProbability", is(0.8F));
+ }
+
+ @Test
+ void deleteUnReferencedShouldAcceptBloomFilterExpectedBlobCountParam() {
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .queryParam("expectedBlobCount", 99)
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("additionalInformation.bloomFilterExpectedBlobCount", is(99));
+ }
+
+ @ParameterizedTest
+ @MethodSource("expectedBlobCountParameters")
+ void deleteUnReferencedShouldReturnErrorWhenExpectedBlobCountInvalid(Object expectedBlobCount) {
+ given()
+ .queryParam("scope", "unreferenced")
+ .queryParam("expectedBlobCount", expectedBlobCount)
+ .delete()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", containsString("expectedBlobCount"));
+ }
+
+ private static Stream<Arguments> expectedBlobCountParameters() {
+ return Stream.of(
+ Arguments.of(-1),
+ Arguments.of(0),
+ Arguments.of("invalid")
+ );
+ }
+
+ @Test
+ void deleteUnReferencedShouldAcceptBloomFilterAssociatedProbabilityParam() {
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .queryParam("associatedProbability", 0.2)
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("additionalInformation.bloomFilterAssociatedProbability", is(0.2F));
+ }
+
+ @ParameterizedTest
+ @MethodSource("associatedProbabilityParameters")
+ void deleteUnReferencedShouldReturnErrorWhenAssociatedProbabilityInvalid(Object associatedProbability) {
+ given()
+ .queryParam("scope", "unreferenced")
+ .queryParam("associatedProbability", associatedProbability)
+ .delete()
+ .then()
+ .statusCode(BAD_REQUEST_400)
+ .contentType(JSON)
+ .body("statusCode", is(BAD_REQUEST_400))
+ .body("type", is("InvalidArgument"))
+ .body("message", is("Invalid arguments supplied in the user request"))
+ .body("details", containsString("associatedProbability"));
+ }
+
+ private static Stream<Arguments> associatedProbabilityParameters() {
+ return Stream.of(
+ Arguments.of(-1),
+ Arguments.of(-0.1F),
+ Arguments.of(1.1),
+ Arguments.of(1),
+ Arguments.of(Integer.MAX_VALUE),
+ Arguments.of("invalid"),
+ Arguments.of("")
+ );
+ }
+
+ @Test
+ void gcTaskShouldRemoveOrphanBlob() {
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+ clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.referenceSourceCount", is(0))
+ .body("additionalInformation.blobCount", is(1))
+ .body("additionalInformation.gcedBlobCount", is(1))
+ .body("additionalInformation.errorCount", is(0));
+
+ assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+ .isInstanceOf(ObjectNotFoundException.class);
+ }
+
+ @Test
+ void gcTaskShouldNotRemoveUnExpireBlob() {
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.referenceSourceCount", is(0))
+ .body("additionalInformation.blobCount", is(1))
+ .body("additionalInformation.gcedBlobCount", is(0))
+ .body("additionalInformation.errorCount", is(0));
+
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull();
+ }
+
+ @Test
+ void gcTaskShouldNotRemoveReferencedBlob() {
+ BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block();
+ when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.just(blobId));
+
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.referenceSourceCount", is(1))
+ .body("additionalInformation.blobCount", is(1))
+ .body("additionalInformation.gcedBlobCount", is(0))
+ .body("additionalInformation.errorCount", is(0));
+
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull();
+ }
+
+ @Test
+ void gcTaskShouldSuccessWhenMixCase() {
+ List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+ List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+
+ when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+ clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+ List<BlobId> unExpiredBlobIds = IntStream.range(0, 30)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await")
+ .then()
+ .body("status", is("completed"))
+ .body("additionalInformation.referenceSourceCount", is(referencedBlobIds.size()))
+ .body("additionalInformation.blobCount", is(referencedBlobIds.size() + orphanBlobIds.size() + unExpiredBlobIds.size()))
+ .body("additionalInformation.gcedBlobCount", Matchers.lessThanOrEqualTo(orphanBlobIds.size()))
+ .body("additionalInformation.errorCount", is(0));
+
+ referencedBlobIds.forEach(blobId ->
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull());
+
+ unExpiredBlobIds.forEach(blobId ->
+ assertThat(blobStore.read(DEFAULT_BUCKET, blobId))
+ .isNotNull());
+ }
+
+ @Test
+ void allOrphanBlobIdsShouldRemovedAfterMultipleCallDeleteUnreferenced() {
+ List<BlobId> referencedBlobIds = IntStream.range(0, 100)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+ List<BlobId> orphanBlobIds = IntStream.range(0, 50)
+ .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block())
+ .collect(Collectors.toList());
+
+ when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds));
+ clock.setInstant(TIMESTAMP.plusMonths(2).toInstant());
+
+ CALMLY_AWAIT.untilAsserted(() -> {
+ String taskId = given()
+ .queryParam("scope", "unreferenced")
+ .delete()
+ .jsonPath()
+ .get("taskId");
+
+ given()
+ .basePath(TasksRoutes.BASE)
+ .when()
+ .get(taskId + "/await");
+
+ orphanBlobIds.forEach(blobId ->
+ assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId))
+ .isInstanceOf(ObjectNotFoundException.class));
+ });
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org