You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/14 01:27:43 UTC
[pulsar] branch master updated: [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 980dfc81a51 [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
980dfc81a51 is described below
commit 980dfc81a5165147615232692e7b5a71b61c71a7
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Aug 14 09:27:36 2023 +0800
[improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
---
.../apache/pulsar/broker/admin/AdminResource.java | 23 ++++++++++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 22 -----------
.../broker/admin/impl/PersistentTopicsBase.java | 4 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 1 +
.../pulsar/broker/admin/AdminApiOffloadTest.java | 17 +++++----
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +-------
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 43 +++++++++++++++++++---
.../apache/pulsar/admin/cli/utils/CmdUtils.java | 14 +++++++
8 files changed, 89 insertions(+), 51 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 828b18c2df9..e9beab90b5f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
@@ -852,4 +853,26 @@ public abstract class AdminResource extends PulsarWebResource {
protected AuthorizationService getAuthorizationService() {
return pulsar().getBrokerService().getAuthorizationService();
}
+
+ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
+ if (offloadPolicies == null) {
+ log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The offloadPolicies must be specified for namespace offload.");
+ }
+ if (!offloadPolicies.driverSupported()) {
+ log.warn("[{}] Failed to update offload configuration for namespace {}: "
+ + "driver is not supported, support value: {}",
+ clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
+ }
+ if (!offloadPolicies.bucketValid()) {
+ log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The bucket must be specified for namespace offload.");
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 789a8e6dbdc..ae36c5c30cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2316,28 +2316,6 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
- if (offloadPolicies == null) {
- log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
- clientAppId(), namespaceName);
- throw new RestException(Status.PRECONDITION_FAILED,
- "The offloadPolicies must be specified for namespace offload.");
- }
- if (!offloadPolicies.driverSupported()) {
- log.warn("[{}] Failed to update offload configuration for namespace {}: "
- + "driver is not supported, support value: {}",
- clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
- throw new RestException(Status.PRECONDITION_FAILED,
- "The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
- }
- if (!offloadPolicies.bucketValid()) {
- log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
- clientAppId(), namespaceName);
- throw new RestException(Status.PRECONDITION_FAILED,
- "The bucket must be specified for namespace offload.");
- }
- }
-
protected void internalRemoveMaxTopicsPerNamespace() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
internalSetMaxTopicsPerNamespace(null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682eb7b12e9..56598f5cc45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -940,8 +940,8 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected CompletableFuture<Void> internalSetOffloadPolicies
- (OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
+ protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
+ boolean isGlobal) {
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1927d4b244a..df50c2721b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -380,6 +380,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
+ .thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
.thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index c3265897b87..95b0d48c69a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -213,6 +213,8 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) admin.topics().getOffloadPolicies(topicName);
assertNull(offloadPolicies);
OffloadPoliciesImpl offload = new OffloadPoliciesImpl();
+ offload.setManagedLedgerOffloadDriver("S3");
+ offload.setManagedLedgerOffloadBucket("bucket");
String path = "fileSystemPath";
offload.setFileSystemProfilePath(path);
admin.topics().setOffloadPolicies(topicName, offload);
@@ -404,12 +406,13 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
//3 construct a topic level offloadPolicies
OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
offloadPolicies.setOffloadersDirectory(".");
- offloadPolicies.setManagedLedgerOffloadDriver("mock");
+ offloadPolicies.setManagedLedgerOffloadDriver("S3");
+ offloadPolicies.setManagedLedgerOffloadBucket("bucket");
offloadPolicies.setManagedLedgerOffloadPrefetchRounds(10);
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024L);
LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
- when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
+ when(topicOffloader.getOffloadDriverName()).thenReturn("S3");
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());
//4 set topic level offload policies
@@ -423,18 +426,18 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
.getTopic(TopicName.get(topicName).getPartition(i).toString(), false).get().get();
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
- , "mock");
+ , "S3");
}
} else {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).get().get();
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
- , "mock");
+ , "S3");
}
//6 remove topic level offload policy, offloader should become namespaceOffloader
LedgerOffloader namespaceOffloader = mock(LedgerOffloader.class);
- when(namespaceOffloader.getOffloadDriverName()).thenReturn("s3");
+ when(namespaceOffloader.getOffloadDriverName()).thenReturn("S3");
Map<NamespaceName, LedgerOffloader> map = new HashMap<>();
map.put(TopicName.get(topicName).getNamespaceObject(), namespaceOffloader);
doReturn(map).when(pulsar).getLedgerOffloaderMap();
@@ -450,14 +453,14 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
.getTopicIfExists(TopicName.get(topicName).getPartition(i).toString()).get().get();
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
- , "s3");
+ , "S3");
}
} else {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
.getTopic(topicName, false).get().get();
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
- , "s3");
+ , "S3");
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 5901d7c177e..33277fdb608 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.admin.cli;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
@@ -2319,20 +2321,6 @@ public class CmdNamespaces extends CmdBase {
return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
}
- public boolean positiveCheck(String paramName, long value) {
- if (value <= 0) {
- throw new ParameterException(paramName + " cannot be less than or equal to 0!");
- }
- return true;
- }
-
- public boolean maxValueCheck(String paramName, long value, long maxValue) {
- if (value > maxValue) {
- throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
- }
- return true;
- }
-
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index d567d0b3671..2914a5e8a08 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.admin.cli;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -1764,24 +1767,24 @@ public class CmdTopicPolicies extends CmdBase {
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
- private int maxBlockSizeInBytes;
+ private int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
- private int readBufferSizeInBytes;
+ private int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
- , description = "ManagedLedger offload threshold in bytes", required = true)
- private long offloadThresholdInBytes;
+ , description = "ManagedLedger offload threshold in bytes")
+ private Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
@Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
, description = "ManagedLedger offload threshold in seconds")
- private Long offloadThresholdInSeconds;
+ private Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
- private Long offloadDeletionLagInMillis;
+ private Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
@Parameter(
names = {"--offloadedReadPriority", "-orp"},
@@ -1798,10 +1801,38 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
+ public final List<String> driverNames = OffloadPoliciesImpl.DRIVER_NAMES;
+
+ public boolean driverSupported(String driver) {
+ return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
+ }
+
+ public boolean isS3Driver(String driver) {
+ if (StringUtils.isEmpty(driver)) {
+ return false;
+ }
+ return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
+ }
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (!driverSupported(driver)) {
+ throw new ParameterException("The driver " + driver + " is not supported, "
+ + "(Possible values: " + String.join(",", driverNames) + ").");
+ }
+
+ if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+ throw new ParameterException(
+ "Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
+ + " if s3 offload enabled");
+ }
+ positiveCheck("maxBlockSizeInBytes", maxBlockSizeInBytes);
+ maxValueCheck("maxBlockSizeInBytes", maxBlockSizeInBytes, Integer.MAX_VALUE);
+ positiveCheck("readBufferSizeInBytes", readBufferSizeInBytes);
+ maxValueCheck("readBufferSizeInBytes", readBufferSizeInBytes, Integer.MAX_VALUE);
+
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
if (this.offloadReadPriorityStr != null) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
index a8659e066e4..a4db39f9cc9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
@@ -56,4 +56,18 @@ public class CmdUtils {
}
}
}
+
+ public static boolean positiveCheck(String paramName, long value) {
+ if (value <= 0) {
+ throw new ParameterException(paramName + " cannot be less than or equal to 0!");
+ }
+ return true;
+ }
+
+ public static boolean maxValueCheck(String paramName, long value, long maxValue) {
+ if (value > maxValue) {
+ throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
+ }
+ return true;
+ }
}