You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/14 01:27:43 UTC

[pulsar] branch master updated: [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 980dfc81a51 [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
980dfc81a51 is described below

commit 980dfc81a5165147615232692e7b5a71b61c71a7
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Aug 14 09:27:36 2023 +0800

    [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 23 ++++++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 22 -----------
 .../broker/admin/impl/PersistentTopicsBase.java    |  4 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  1 +
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 17 +++++----
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +-------
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 43 +++++++++++++++++++---
 .../apache/pulsar/admin/cli/utils/CmdUtils.java    | 14 +++++++
 8 files changed, 89 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 828b18c2df9..e9beab90b5f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PolicyName;
@@ -852,4 +853,26 @@ public abstract class AdminResource extends PulsarWebResource {
     protected AuthorizationService getAuthorizationService() {
         return pulsar().getBrokerService().getAuthorizationService();
     }
+
+    protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
+        if (offloadPolicies == null) {
+            log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The offloadPolicies must be specified for namespace offload.");
+        }
+        if (!offloadPolicies.driverSupported()) {
+            log.warn("[{}] Failed to update offload configuration for namespace {}: "
+                            + "driver is not supported, support value: {}",
+                    clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
+        }
+        if (!offloadPolicies.bucketValid()) {
+            log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The bucket must be specified for namespace offload.");
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 789a8e6dbdc..ae36c5c30cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2316,28 +2316,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
-        if (offloadPolicies == null) {
-            log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The offloadPolicies must be specified for namespace offload.");
-        }
-        if (!offloadPolicies.driverSupported()) {
-            log.warn("[{}] Failed to update offload configuration for namespace {}: "
-                            + "driver is not supported, support value: {}",
-                    clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
-        }
-        if (!offloadPolicies.bucketValid()) {
-            log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The bucket must be specified for namespace offload.");
-        }
-    }
-
    protected void internalRemoveMaxTopicsPerNamespace() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
         internalSetMaxTopicsPerNamespace(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682eb7b12e9..56598f5cc45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -940,8 +940,8 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
-    protected CompletableFuture<Void> internalSetOffloadPolicies
-            (OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
+    protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
+                                                                 boolean isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1927d4b244a..df50c2721b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -380,6 +380,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
+            .thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
             .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index c3265897b87..95b0d48c69a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -213,6 +213,8 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) admin.topics().getOffloadPolicies(topicName);
         assertNull(offloadPolicies);
         OffloadPoliciesImpl offload = new OffloadPoliciesImpl();
+        offload.setManagedLedgerOffloadDriver("S3");
+        offload.setManagedLedgerOffloadBucket("bucket");
         String path = "fileSystemPath";
         offload.setFileSystemProfilePath(path);
         admin.topics().setOffloadPolicies(topicName, offload);
@@ -404,12 +406,13 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         //3 construct a topic level offloadPolicies
         OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
         offloadPolicies.setOffloadersDirectory(".");
-        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadDriver("S3");
+        offloadPolicies.setManagedLedgerOffloadBucket("bucket");
         offloadPolicies.setManagedLedgerOffloadPrefetchRounds(10);
         offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024L);
 
         LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
-        when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
+        when(topicOffloader.getOffloadDriverName()).thenReturn("S3");
         doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());
 
         //4 set topic level offload policies
@@ -423,18 +426,18 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
                         .getTopic(TopicName.get(topicName).getPartition(i).toString(), false).get().get();
                 assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
                 assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                        , "mock");
+                        , "S3");
             }
         } else {
             PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
                     .getTopic(topicName, false).get().get();
             assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
             assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                    , "mock");
+                    , "S3");
         }
         //6 remove topic level offload policy, offloader should become namespaceOffloader
         LedgerOffloader namespaceOffloader = mock(LedgerOffloader.class);
-        when(namespaceOffloader.getOffloadDriverName()).thenReturn("s3");
+        when(namespaceOffloader.getOffloadDriverName()).thenReturn("S3");
         Map<NamespaceName, LedgerOffloader> map = new HashMap<>();
         map.put(TopicName.get(topicName).getNamespaceObject(), namespaceOffloader);
         doReturn(map).when(pulsar).getLedgerOffloaderMap();
@@ -450,14 +453,14 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
                         .getTopicIfExists(TopicName.get(topicName).getPartition(i).toString()).get().get();
                 assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
                 assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                        , "s3");
+                        , "S3");
             }
         } else {
             PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
                     .getTopic(topicName, false).get().get();
             assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
             assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                    , "s3");
+                    , "S3");
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 5901d7c177e..33277fdb608 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
@@ -2319,20 +2321,6 @@ public class CmdNamespaces extends CmdBase {
             return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
         }
 
-        public boolean positiveCheck(String paramName, long value) {
-            if (value <= 0) {
-                throw new ParameterException(paramName + " cannot be less than or equal to 0!");
-            }
-            return true;
-        }
-
-        public boolean maxValueCheck(String paramName, long value, long maxValue) {
-            if (value > maxValue) {
-                throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
-            }
-            return true;
-        }
-
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index d567d0b3671..2914a5e8a08 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -1764,24 +1767,24 @@ public class CmdTopicPolicies extends CmdBase {
         @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
                 description = "ManagedLedger offload max block Size in bytes,"
                         + "s3 and google-cloud-storage requires this parameter")
-        private int maxBlockSizeInBytes;
+        private int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
 
         @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
                 description = "ManagedLedger offload read buffer size in bytes,"
                         + "s3 and google-cloud-storage requires this parameter")
-        private int readBufferSizeInBytes;
+        private int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
 
         @Parameter(names = {"-t", "--offloadThresholdInBytes"}
-                , description = "ManagedLedger offload threshold in bytes", required = true)
-        private long offloadThresholdInBytes;
+                , description = "ManagedLedger offload threshold in bytes")
+        private Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
 
         @Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
                 , description = "ManagedLedger offload threshold in seconds")
-        private Long offloadThresholdInSeconds;
+        private Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
 
         @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
                 , description = "ManagedLedger offload deletion lag in bytes")
-        private Long offloadDeletionLagInMillis;
+        private Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
 
         @Parameter(
                 names = {"--offloadedReadPriority", "-orp"},
@@ -1798,10 +1801,38 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other clusters asynchronously")
         private boolean isGlobal = false;
 
+        public final List<String> driverNames = OffloadPoliciesImpl.DRIVER_NAMES;
+
+        public boolean driverSupported(String driver) {
+            return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
+        }
+
+        public boolean isS3Driver(String driver) {
+            if (StringUtils.isEmpty(driver)) {
+                return false;
+            }
+            return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
+        }
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
 
+            if (!driverSupported(driver)) {
+                throw new ParameterException("The driver " + driver + " is not supported, "
+                        + "(Possible values: " + String.join(",", driverNames) + ").");
+            }
+
+            if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+                throw new ParameterException(
+                        "Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
+                                + " if s3 offload enabled");
+            }
+            positiveCheck("maxBlockSizeInBytes", maxBlockSizeInBytes);
+            maxValueCheck("maxBlockSizeInBytes", maxBlockSizeInBytes, Integer.MAX_VALUE);
+            positiveCheck("readBufferSizeInBytes", readBufferSizeInBytes);
+            maxValueCheck("readBufferSizeInBytes", readBufferSizeInBytes, Integer.MAX_VALUE);
+
             OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
 
             if (this.offloadReadPriorityStr != null) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
index a8659e066e4..a4db39f9cc9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
@@ -56,4 +56,18 @@ public class CmdUtils {
             }
         }
     }
+
+    public static boolean positiveCheck(String paramName, long value) {
+        if (value <= 0) {
+            throw new ParameterException(paramName + " cannot be less than or equal to 0!");
+        }
+        return true;
+    }
+
+    public static boolean maxValueCheck(String paramName, long value, long maxValue) {
+        if (value > maxValue) {
+            throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
+        }
+        return true;
+    }
 }