You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/06 00:43:49 UTC
[pulsar] branch master updated: Add support for remove offload
policy in the namespace level (#8446)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 24cd557 Add support for remove offload policy in the namespace level (#8446)
24cd557 is described below
commit 24cd557fd77a8084d6cf775afaa9d393d3359972
Author: Renkai <ga...@gmail.com>
AuthorDate: Fri Nov 6 08:43:32 2020 +0800
Add support for remove offload policy in the namespace level (#8446)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 41 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 28 ++++++++++++---
.../pulsar/broker/admin/AdminApiOffloadTest.java | 4 +++
.../org/apache/pulsar/client/admin/Namespaces.java | 28 +++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 22 ++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 15 ++++++++
7 files changed, 137 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 754a4f4..b752599 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -3000,6 +3001,46 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, namespaceName.toString());
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+
+ policies.offload_policies = null;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
+ (rc, path1, ctx, stat) -> {
+ if (rc == KeeperException.Code.OK.intValue()) {
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ log.info("[{}] Successfully remove offload configuration: namespace={}", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ String errorMsg = String.format(
+ "[%s] Failed to remove offload configuration for namespace %s",
+ clientAppId(), namespaceName);
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ log.warn("{} : does not exist", errorMsg);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+ log.warn("{} : concurrent modification", errorMsg);
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ } else {
+ asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+ }
+ }
+ }, null);
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove offload configuration for namespace {}", clientAppId(), namespaceName,
+ e);
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (offloadPolicies == null) {
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 59e896c..1c9119f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1322,8 +1322,8 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid") })
public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
- @Suspended final AsyncResponse asyncResponse) {
+ @ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
+ @Suspended final AsyncResponse asyncResponse) {
try {
validateNamespaceName(tenant, namespace);
internalSetOffloadPolicies(asyncResponse, offload);
@@ -1334,14 +1334,34 @@ public class Namespaces extends NamespacesBase {
}
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/removeOffloadPolicies")
+ @ApiOperation(value = " Set offload configuration on a namespace.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid")})
+ public void removeOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ @Suspended final AsyncResponse asyncResponse) {
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalRemoveOffloadPolicies(asyncResponse);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@GET
@Path("/{tenant}/{namespace}/offloadPolicies")
@ApiOperation(value = "Get offload configuration on a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Namespace does not exist") })
+ @ApiResponse(code = 404, message = "Namespace does not exist")})
public OffloadPolicies getOffloadPolicies(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetOffloadPolicies();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 4a8b914..9591f7a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -172,6 +172,10 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
+
+ admin.namespaces().removeOffloadPolicies(namespaceName);
+ OffloadPolicies offload3 = admin.namespaces().getOffloadPolicies(namespaceName);
+ assertNull(offload3);
}
@Test(timeOut = 20000)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 129d43e..4acc74d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3293,6 +3293,20 @@ public interface Namespaces {
void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException;
/**
+ * Remove the offload configuration for a namespace.
+ * <p/>
+ * Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
+ * <p/>
+ *
+ * @param namespace Namespace name
+ * @throws NotAuthorizedException Don't have admin permission
+ * @throws NotFoundException Namespace does not exist
+ * @throws ConflictException Concurrent modification
+ * @throws PulsarAdminException Unexpected error
+ */
+ void removeOffloadPolicies(String namespace) throws PulsarAdminException;
+
+ /**
* Set the offload configuration for all the topics in a namespace asynchronously.
* <p/>
* Set the offload configuration in a namespace. This operation requires pulsar tenant access.
@@ -3320,6 +3334,20 @@ public interface Namespaces {
CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies);
/**
+ * Remove the offload configuration for a namespace asynchronously.
+ * <p/>
+ * Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
+ * <p/>
+ *
+ * @param namespace Namespace name
+ * @throws NotAuthorizedException Don't have admin permission
+ * @throws NotFoundException Namespace does not exist
+ * @throws ConflictException Concurrent modification
+ * @throws PulsarAdminException Unexpected error
+ */
+ CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace);
+
+ /**
* Get the offload configuration for a namespace.
* <p/>
* Get the offload configuration for a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 35434cb..81945ef 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2676,6 +2676,21 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void removeOffloadPolicies(String namespace) throws PulsarAdminException {
+ try {
+ removeOffloadPoliciesAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "offloadPolicies");
@@ -2683,6 +2698,13 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "removeOffloadPolicies");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException {
try {
return getOffloadPoliciesAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cf35d0b..d4cff2b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -546,6 +546,9 @@ public class PulsarAdminToolTest {
"http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
10L * 1024 * 1024, 10000L));
+ namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
+ verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
+
namespaces.run(split("get-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6a8747a..459e450 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1704,6 +1704,20 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove the offload policies for a namespace")
+ private class RemoveOffloadPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+
+ admin.namespaces().removeOffloadPolicies(namespace);
+ }
+ }
+
+
@Parameters(commandDescription = "Get the offload policies for a namespace")
private class GetOffloadPolicies extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -1840,6 +1854,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());
jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
+ jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
}
}