You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/06 00:43:49 UTC

[pulsar] branch master updated: Add support for remove offload policy in the namespace level (#8446)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 24cd557  Add support for remove offload policy in the namespace level (#8446)
24cd557 is described below

commit 24cd557fd77a8084d6cf775afaa9d393d3359972
Author: Renkai <ga...@gmail.com>
AuthorDate: Fri Nov 6 08:43:32 2020 +0800

    Add support for remove offload policy in the namespace level (#8446)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 41 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 28 ++++++++++++---
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |  4 +++
 .../org/apache/pulsar/client/admin/Namespaces.java | 28 +++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      | 22 ++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 15 ++++++++
 7 files changed, 137 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 754a4f4..b752599 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -48,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -3000,6 +3001,46 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+
+            policies.offload_policies = null;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
+                    (rc, path1, ctx, stat) -> {
+                        if (rc == KeeperException.Code.OK.intValue()) {
+                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                            log.info("[{}] Successfully remove offload configuration: namespace={}", clientAppId(),
+                                    namespaceName);
+                            asyncResponse.resume(Response.noContent().build());
+                        } else {
+                            String errorMsg = String.format(
+                                    "[%s] Failed to remove offload configuration for namespace %s",
+                                    clientAppId(), namespaceName);
+                            if (rc == KeeperException.Code.NONODE.intValue()) {
+                                log.warn("{} : does not exist", errorMsg);
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                            } else if (rc == KeeperException.Code.BADVERSION.intValue()) {
+                                log.warn("{} : concurrent modification", errorMsg);
+                                asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                            } else {
+                                asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
+                            }
+                        }
+                    }, null);
+        } catch (Exception e) {
+            log.error("[{}] Failed to remove offload configuration for namespace {}", clientAppId(), namespaceName,
+                    e);
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
         if (offloadPolicies == null) {
             log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 59e896c..1c9119f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1322,8 +1322,8 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Concurrent modification"),
             @ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid") })
     public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
-            @Suspended final AsyncResponse asyncResponse) {
+                                   @ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
+                                   @Suspended final AsyncResponse asyncResponse) {
         try {
             validateNamespaceName(tenant, namespace);
             internalSetOffloadPolicies(asyncResponse, offload);
@@ -1334,14 +1334,34 @@ public class Namespaces extends NamespacesBase {
         }
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/removeOffloadPolicies")
+    @ApiOperation(value = " Set offload configuration on a namespace.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid")})
+    public void removeOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+                                      @Suspended final AsyncResponse asyncResponse) {
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalRemoveOffloadPolicies(asyncResponse);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/offloadPolicies")
     @ApiOperation(value = "Get offload configuration on a namespace.")
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
-            @ApiResponse(code = 404, message = "Namespace does not exist") })
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
     public OffloadPolicies getOffloadPolicies(@PathParam("tenant") String tenant,
-                                        @PathParam("namespace") String namespace) {
+                                              @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
         return internalGetOffloadPolicies();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 4a8b914..9591f7a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -172,6 +172,10 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().setOffloadPolicies(namespaceName, offload1);
         OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
         assertEquals(offload1, offload2);
+
+        admin.namespaces().removeOffloadPolicies(namespaceName);
+        OffloadPolicies offload3 = admin.namespaces().getOffloadPolicies(namespaceName);
+        assertNull(offload3);
     }
 
     @Test(timeOut = 20000)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 129d43e..4acc74d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3293,6 +3293,20 @@ public interface Namespaces {
     void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException;
 
     /**
+     * Remove the offload configuration for a namespace.
+     * <p/>
+     * Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
+     * <p/>
+     *
+     * @param namespace Namespace name
+     * @throws NotAuthorizedException Don't have admin permission
+     * @throws NotFoundException      Namespace does not exist
+     * @throws ConflictException      Concurrent modification
+     * @throws PulsarAdminException   Unexpected error
+     */
+    void removeOffloadPolicies(String namespace) throws PulsarAdminException;
+
+    /**
      * Set the offload configuration for all the topics in a namespace asynchronously.
      * <p/>
      * Set the offload configuration in a namespace. This operation requires pulsar tenant access.
@@ -3320,6 +3334,20 @@ public interface Namespaces {
     CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies);
 
     /**
+     * Remove the offload configuration for a namespace asynchronously.
+     * <p/>
+     * Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
+     * <p/>
+     *
+     * @param namespace Namespace name
+     * @throws NotAuthorizedException Don't have admin permission
+     * @throws NotFoundException      Namespace does not exist
+     * @throws ConflictException      Concurrent modification
+     * @throws PulsarAdminException   Unexpected error
+     */
+    CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace);
+
+    /**
      * Get the offload configuration for a namespace.
      * <p/>
      * Get the offload configuration for a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 35434cb..81945ef 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2676,6 +2676,21 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void removeOffloadPolicies(String namespace) throws PulsarAdminException {
+        try {
+            removeOffloadPoliciesAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "offloadPolicies");
@@ -2683,6 +2698,13 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "removeOffloadPolicies");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException {
         try {
             return getOffloadPoliciesAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cf35d0b..d4cff2b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -546,6 +546,9 @@ public class PulsarAdminToolTest {
                         "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
                         10L * 1024 * 1024, 10000L));
 
+        namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
+        verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
+
         namespaces.run(split("get-offload-policies myprop/clust/ns1"));
         verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6a8747a..459e450 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1704,6 +1704,20 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove the offload policies for a namespace")
+    private class RemoveOffloadPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+
+            admin.namespaces().removeOffloadPolicies(namespace);
+        }
+    }
+
+
     @Parameters(commandDescription = "Get the offload policies for a namespace")
     private class GetOffloadPolicies extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -1840,6 +1854,7 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());
 
         jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
+        jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
         jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
     }
 }