You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/25 13:04:50 UTC
[pulsar] branch master updated: [improve][broker] make offload police methods async in Namespaces (#16760)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec5efa429a7 [improve][broker] make offload police methods async in Namespaces (#16760)
ec5efa429a7 is described below
commit ec5efa429a7f0b4f57550a4f62dc2df25e1cb3b4
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jul 25 21:04:42 2022 +0800
[improve][broker] make offload police methods async in Namespaces (#16760)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 32 ---------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 41 +++++++++---
.../apache/pulsar/broker/admin/v2/Namespaces.java | 75 ++++++++++++++++++----
3 files changed, 96 insertions(+), 52 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5b069edf75b..5505afc4cf8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2338,11 +2338,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected Long internalGetCompactionThreshold() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ);
- return getNamespacePolicies(namespaceName).compaction_threshold;
- }
-
protected void internalSetCompactionThreshold(Long newThreshold) {
validateNamespacePolicyOperation(namespaceName, PolicyName.COMPACTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2368,16 +2363,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected long internalGetOffloadThreshold() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- if (policies.offload_policies == null) {
- return policies.offload_threshold;
- } else {
- return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes();
- }
- }
-
protected void internalSetOffloadThreshold(long newThreshold) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2403,16 +2388,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected Long internalGetOffloadDeletionLag() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- if (policies.offload_policies == null) {
- return policies.offload_deletion_lag_ms;
- } else {
- return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis();
- }
- }
-
protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2660,13 +2635,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected OffloadPoliciesImpl internalGetOffloadPolicies() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ);
-
- Policies policies = getNamespacePolicies(namespaceName);
- return (OffloadPoliciesImpl) policies.offload_policies;
- }
-
protected int internalGetMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index d3f1fb9a9e4..1c2fb282e10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1522,11 +1522,21 @@ public class Namespaces extends NamespacesBase {
+ "A threshold of 0 disabled automatic compaction")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public Long getCompactionThreshold(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getCompactionThreshold(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetCompactionThreshold();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.compaction_threshold))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get compaction threshold on namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -1553,11 +1563,26 @@ public class Namespaces extends NamespacesBase {
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public long getOffloadThreshold(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace) {
+ public void getOffloadThreshold(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetOffloadThreshold();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> {
+ if (policies.offload_policies == null) {
+ asyncResponse.resume(policies.offload_threshold);
+ } else {
+ asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInBytes());
+ }
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6d8fe5bc1d1..5417f2e19c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1859,10 +1859,20 @@ public class Namespaces extends NamespacesBase {
+ "A threshold of 0 disabled automatic compaction")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public Long getCompactionThreshold(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getCompactionThreshold(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetCompactionThreshold();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.COMPACTION, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.compaction_threshold))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get compaction threshold on namespace {}", clientAppId(), namespaceName,
+ ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -1904,10 +1914,25 @@ public class Namespaces extends NamespacesBase {
notes = "A negative value disables automatic offloading")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public long getOffloadThreshold(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getOffloadThreshold(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetOffloadThreshold();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> {
+ if (policies.offload_policies == null) {
+ asyncResponse.resume(policies.offload_threshold);
+ } else {
+ asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInBytes());
+ }
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get offload threshold on namespace {}", clientAppId(), namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -1939,10 +1964,26 @@ public class Namespaces extends NamespacesBase {
+ " broker default for deletion lag.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
- public Long getOffloadDeletionLag(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getOffloadDeletionLag(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetOffloadDeletionLag();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> {
+ if (policies.offload_policies == null) {
+ asyncResponse.resume(policies.offload_deletion_lag_ms);
+ } else {
+ asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis());
+ }
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get offload deletion lag milliseconds on namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
@@ -2194,10 +2235,20 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
- public OffloadPoliciesImpl getOffloadPolicies(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ public void getOffloadPolicies(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetOffloadPolicies();
+ validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenAccept(policies -> asyncResponse.resume(policies.offload_policies))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to get offload policies on a namespace {}", clientAppId(),
+ namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET