You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/29 12:47:44 UTC

[pulsar] branch master updated: [improve][broker] part 1:make some methods async in Namespaces (#16814)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2be5a67b2 [improve][broker] part 1:make some methods async in Namespaces (#16814)
1e2be5a67b2 is described below

commit 1e2be5a67b2afc8e95f1d83b5c860e7950bd4752
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Fri Jul 29 20:47:37 2022 +0800

    [improve][broker] part 1:make some methods async in Namespaces (#16814)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  56 ---------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  34 ++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 131 ++++++++++++++++-----
 .../apache/pulsar/broker/admin/NamespacesTest.java |   7 +-
 4 files changed, 135 insertions(+), 93 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 838304117bc..91ffe650ac3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1293,13 +1293,6 @@ public abstract class NamespacesBase extends AdminResource {
         }));
     }
 
-    protected PublishRate internalGetPublishRate() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
-    }
-
     protected CompletableFuture<PublishRate> internalGetPublishRateAsync() {
         return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
                 .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
@@ -1365,14 +1358,6 @@ public abstract class NamespacesBase extends AdminResource {
         }));
     }
 
-    @SuppressWarnings("deprecation")
-    protected DispatchRate internalGetTopicDispatchRate() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName());
-    }
-
     @SuppressWarnings("deprecation")
     protected CompletableFuture<DispatchRate> internalGetTopicDispatchRateAsync() {
         return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
@@ -1825,11 +1810,6 @@ public abstract class NamespacesBase extends AdminResource {
         return policies.encryption_required;
     }
 
-    protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).delayed_delivery_policies;
-    }
-
     protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies) {
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
@@ -1957,13 +1937,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected RetentionPolicies internalGetRetention() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.retention_policies;
-    }
-
     private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
         Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
         if (backlogQuotaMap.isEmpty()) {
@@ -2244,11 +2217,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Integer internalGetMaxUnackedMessagesPerConsumer() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).max_unacked_messages_per_consumer;
-    }
-
     protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessagesPerConsumer) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -2273,16 +2241,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Integer internalGetMaxUnackedMessagesPerSubscription() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).max_unacked_messages_per_subscription;
-    }
-
-    protected Integer internalGetMaxSubscriptionsPerTopic() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).max_subscriptions_per_topic;
-    }
-
     protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
@@ -2399,14 +2357,6 @@ public abstract class NamespacesBase extends AdminResource {
         return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy;
     }
 
-    protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
-                PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-
-        return policies.schema_compatibility_strategy;
-    }
-
     @Deprecated
     protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdateCompatibilityStrategy strategy) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
@@ -2615,12 +2565,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected int internalGetMaxTopicsPerNamespace() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
-                ? getNamespacePolicies(namespaceName).max_topics_per_namespace : 0;
-    }
-
    protected void internalRemoveMaxTopicsPerNamespace() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
         internalSetMaxTopicsPerNamespace(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 5084e09fd31..a3be86cfe85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -1017,10 +1016,21 @@ public class Namespaces extends NamespacesBase {
                     + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
+    public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("property") String property,
+                                @PathParam("cluster") String cluster,
+                                @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetTopicDispatchRate();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(
+                        policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName())))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get dispatch-rate configured for the namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1177,10 +1187,20 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Get retention config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public RetentionPolicies getRetention(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace) {
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+                             @PathParam("property") String property,
+                             @PathParam("cluster") String cluster,
+                             @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetRetention();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 8158bcc362a..0d4f071ade0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1195,10 +1195,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get retention config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public RetentionPolicies getRetention(@PathParam("tenant") String tenant,
-            @PathParam("namespace") String namespace) {
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+                             @PathParam("tenant") String tenant,
+                             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetRetention();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1492,10 +1501,19 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification"), })
-    public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
-                                         @PathParam("namespace") String namespace) {
+    public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
+                                           @PathParam("tenant") String tenant,
+                                           @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetDelayedDelivery();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DELAYED_DELIVERY, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.delayed_delivery_policies))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get delayed delivery messages config on a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1719,10 +1737,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public Integer getMaxUnackedMessagesPerConsumer(@PathParam("tenant") String tenant,
-                                       @PathParam("namespace") String namespace) {
+    public void getMaxUnackedMessagesPerConsumer(@Suspended final AsyncResponse asyncResponse,
+                                                 @PathParam("tenant") String tenant,
+                                                 @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetMaxUnackedMessagesPerConsumer();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.max_unacked_messages_per_consumer))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get maxUnackedMessagesPerConsumer config on a namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1757,10 +1784,20 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxUnackedMessagesPerSubscription config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public Integer getMaxUnackedmessagesPerSubscription(@PathParam("tenant") String tenant,
-                                              @PathParam("namespace") String namespace) {
+    public void getMaxUnackedmessagesPerSubscription(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetMaxUnackedMessagesPerSubscription();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_UNACKED, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.max_unacked_messages_per_subscription))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get maxUnackedMessagesPerSubscription config on a namespace {}",
+                            clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -1794,10 +1831,19 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxSubscriptionsPerTopic config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public Integer getMaxSubscriptionsPerTopic(@PathParam("tenant") String tenant,
-                                              @PathParam("namespace") String namespace) {
+    public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
+                                            @PathParam("tenant") String tenant,
+                                            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetMaxSubscriptionsPerTopic();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.max_subscriptions_per_topic))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get maxSubscriptionsPerTopic config on a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -2096,11 +2142,21 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(
+    public void getSchemaCompatibilityStrategy(
+            @Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetSchemaCompatibilityStrategy();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.schema_compatibility_strategy))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get the strategy of the namespace schema compatibility {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -2294,10 +2350,23 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxTopicsPerNamespace config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or namespace does not exist") })
-    public Integer getMaxTopicsPerNamespace(@PathParam("tenant") String tenant,
-                                              @PathParam("namespace") String namespace) {
+    public void getMaxTopicsPerNamespace(@Suspended final AsyncResponse asyncResponse,
+                                         @PathParam("tenant") String tenant,
+                                         @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetMaxTopicsPerNamespace();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> {
+                    int maxTopicsPerNamespace =
+                            policies.max_topics_per_namespace != null ? policies.max_topics_per_namespace : 0;
+                    asyncResponse.resume(maxTopicsPerNamespace);
+                })
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get maxTopicsPerNamespace config on a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -2410,17 +2479,23 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/resourcegroup")
-    @ApiOperation(value = "Get the resourcegroup attached to the namespace")
+    @ApiOperation(value = "Get the resource group attached to the namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public String getNamespaceResourceGroup(@PathParam("tenant") String tenant,
-                                      @PathParam("namespace") String namespace) {
+    public void getNamespaceResourceGroup(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.RESOURCEGROUP,
-                PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return policies.resource_group_name;
+        validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.RESOURCEGROUP,
+                PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.resource_group_name))
+                .exceptionally(ex -> {
+                    log.error("Failed to get the resource group attached to the namespace {}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 1a4372985ea..0685c9f11ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1037,8 +1037,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
             RetentionPolicies retention = new RetentionPolicies(10, 10);
             namespaces.setRetention(this.testTenant, this.testLocalCluster, bundledNsLocal, retention);
-            RetentionPolicies retention2 = namespaces.getRetention(this.testTenant, this.testLocalCluster,
-                    bundledNsLocal);
+            AsyncResponse response = mock(AsyncResponse.class);
+            namespaces.getRetention(response, this.testTenant, this.testLocalCluster, bundledNsLocal);
+            ArgumentCaptor<RetentionPolicies> captor = ArgumentCaptor.forClass(RetentionPolicies.class);
+            verify(response, timeout(5000).times(1)).resume(captor.capture());
+            RetentionPolicies retention2 = captor.getValue();
             assertEquals(retention, retention2);
         } catch (RestException e) {
             fail("ValidateNamespaceOwnershipWithBundles failed");