You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/25 13:04:50 UTC

[pulsar] branch master updated: [improve][broker] make offload police methods async in Namespaces (#16760)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec5efa429a7 [improve][broker] make offload police methods async in Namespaces (#16760)
ec5efa429a7 is described below

commit ec5efa429a7f0b4f57550a4f62dc2df25e1cb3b4
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jul 25 21:04:42 2022 +0800

    [improve][broker] make offload police methods async in Namespaces (#16760)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 32 ---------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 41 +++++++++---
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 75 ++++++++++++++++++----
 3 files changed, 96 insertions(+), 52 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5b069edf75b..5505afc4cf8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2338,11 +2338,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Long internalGetCompactionThreshold() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).compaction_threshold;
-    }
-
     protected void internalSetCompactionThreshold(Long newThreshold) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -2368,16 +2363,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected long internalGetOffloadThreshold() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        if (policies.offload_policies == null) {
-            return policies.offload_threshold;
-        } else {
-            return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes();
-        }
-    }
-
     protected void internalSetOffloadThreshold(long newThreshold) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -2403,16 +2388,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected Long internalGetOffloadDeletionLag() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
-        Policies policies = getNamespacePolicies(namespaceName);
-        if (policies.offload_policies == null) {
-            return policies.offload_deletion_lag_ms;
-        } else {
-            return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis();
-        }
-    }
-
     protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
@@ -2660,13 +2635,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected OffloadPoliciesImpl internalGetOffloadPolicies() {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        return (OffloadPoliciesImpl) policies.offload_policies;
-    }
-
     protected int internalGetMaxTopicsPerNamespace() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index d3f1fb9a9e4..1c2fb282e10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1522,11 +1522,21 @@ public class Namespaces extends NamespacesBase {
                           + "A threshold of 0 disabled automatic compaction")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public Long getCompactionThreshold(@PathParam("property") String property,
-                                       @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace) {
+    public void getCompactionThreshold(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetCompactionThreshold();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.compaction_threshold))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get compaction threshold on namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -1553,11 +1563,26 @@ public class Namespaces extends NamespacesBase {
                   notes = "A negative value disables automatic offloading")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public long getOffloadThreshold(@PathParam("property") String property,
-                                    @PathParam("cluster") String cluster,
-                                    @PathParam("namespace") String namespace) {
+    public void getOffloadThreshold(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(property, cluster, namespace);
-        return internalGetOffloadThreshold();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> {
+                    if (policies.offload_policies == null) {
+                        asyncResponse.resume(policies.offload_threshold);
+                    } else {
+                        asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInBytes());
+                    }
+                })
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6d8fe5bc1d1..5417f2e19c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1859,10 +1859,20 @@ public class Namespaces extends NamespacesBase {
                           + "A threshold of 0 disabled automatic compaction")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public Long getCompactionThreshold(@PathParam("tenant") String tenant,
-                                       @PathParam("namespace") String namespace) {
+    public void getCompactionThreshold(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetCompactionThreshold();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.compaction_threshold))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get compaction threshold on namespace {}", clientAppId(), namespaceName,
+                            ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -1904,10 +1914,25 @@ public class Namespaces extends NamespacesBase {
                   notes = "A negative value disables automatic offloading")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public long getOffloadThreshold(@PathParam("tenant") String tenant,
-                                       @PathParam("namespace") String namespace) {
+    public void getOffloadThreshold(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetOffloadThreshold();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> {
+                    if (policies.offload_policies == null) {
+                        asyncResponse.resume(policies.offload_threshold);
+                    } else {
+                        asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInBytes());
+                    }
+                })
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -1939,10 +1964,26 @@ public class Namespaces extends NamespacesBase {
                           + " broker default for deletion lag.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
                             @ApiResponse(code = 404, message = "Namespace doesn't exist") })
-    public Long getOffloadDeletionLag(@PathParam("tenant") String tenant,
-                                      @PathParam("namespace") String namespace) {
+    public void getOffloadDeletionLag(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetOffloadDeletionLag();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> {
+                    if (policies.offload_policies == null) {
+                        asyncResponse.resume(policies.offload_deletion_lag_ms);
+                    } else {
+                        asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis());
+                    }
+                })
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get offload deletion lag milliseconds on namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
@@ -2194,10 +2235,20 @@ public class Namespaces extends NamespacesBase {
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
-    public OffloadPoliciesImpl getOffloadPolicies(@PathParam("tenant") String tenant,
-                                                  @PathParam("namespace") String namespace) {
+    public void getOffloadPolicies(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
-        return internalGetOffloadPolicies();
+        validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(policies -> asyncResponse.resume(policies.offload_policies))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get offload policies on a namespace {}", clientAppId(),
+                            namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET