You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/12 03:57:35 UTC
[pulsar] branch master updated: Check for null arguments in
Namespaces Rest API (#7247)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 134a8a2 Check for null arguments in Namespaces Rest API (#7247)
134a8a2 is described below
commit 134a8a2e2064a895e9a70bab2ba4c01510fb349e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Jun 11 20:57:20 2020 -0700
Check for null arguments in Namespaces Rest API (#7247)
* Check for null arguments
* Fix test
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 +++++-
.../pulsar/broker/admin/impl/NamespacesBase.java | 57 ++++++++++++++--------
.../broker/admin/impl/PersistentTopicsBase.java | 18 +++----
.../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 6 +--
4 files changed, 62 insertions(+), 34 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1d765f5..7d4b6d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin;
-import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;
@@ -397,7 +396,7 @@ public abstract class AdminResource extends PulsarWebResource {
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
- checkArgument(parts.length == 2, "Invalid broker url %s", broker);
+ checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
String host = parts[0];
int port = Integer.parseInt(parts[1]);
@@ -844,4 +843,16 @@ public abstract class AdminResource extends PulsarWebResource {
asyncResponse.resume(new RestException(throwable));
}
}
+
+ protected void checkNotNull(Object o, String errorMessage) {
+ if (o == null) {
+ throw new RestException(Status.BAD_REQUEST, errorMessage);
+ }
+ }
+
+ protected void checkArgument(boolean b, String errorMessage) {
+ if (!b) {
+ throw new RestException(Status.BAD_REQUEST, errorMessage);
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 42fd278..b117c9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -110,6 +108,7 @@ public abstract class NamespacesBase extends AdminResource {
private static final long MAX_BUNDLES = ((long) 1) << 32;
protected List<String> internalGetTenantNamespaces(String tenant) {
+ checkNotNull(tenant, "Tenant should not be null");
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
try {
@@ -380,6 +379,8 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
+ checkNotNull(role, "Role should not be null");
+ checkNotNull(actions, "Actions should not be null");
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -411,6 +412,8 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
+ checkNotNull(subscription, "Subscription should not be null");
+ checkNotNull(roles, "Roles should not be null");
try {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -442,6 +445,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalRevokePermissionsOnNamespace(String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
+ checkNotNull(role, "Role should not be null");
try {
Stat nodeStat = new Stat();
@@ -472,6 +476,8 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
validatePoliciesReadOnlyAccess();
+ checkNotNull(subscriptionName, "SubscriptionName should not be null");
+ checkNotNull(role, "Role should not be null");
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
@@ -497,6 +503,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
+ checkNotNull(clusterIds, "ClusterIds should not be null");
Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
if (!namespaceName.isGlobal()) {
@@ -1021,6 +1028,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
validateSuperUserAccess();
+ checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1069,6 +1077,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
validateSuperUserAccess();
+ checkNotNull(bundleRange, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
Policies policies = getNamespacePolicies(namespaceName);
@@ -1581,6 +1590,7 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+ checkNotNull(bundleRange, "BundleRange should not be null");
Policies policies = getNamespacePolicies(namespaceName);
@@ -1602,6 +1612,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+ checkNotNull(subscription, "Subscription should not be null");
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
@@ -1645,6 +1656,8 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+ checkNotNull(subscription, "Subscription should not be null");
+ checkNotNull(bundleRange, "BundleRange should not be null");
Policies policies = getNamespacePolicies(namespaceName);
@@ -1666,6 +1679,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
+ checkNotNull(subscription, "Subscription should not be null");
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
@@ -1708,6 +1722,8 @@ public abstract class NamespacesBase extends AdminResource {
@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
+ checkNotNull(subscription, "Subscription should not be null");
+ checkNotNull(bundleRange, "BundleRange should not be null");
Policies policies = getNamespacePolicies(namespaceName);
@@ -1845,6 +1861,7 @@ public abstract class NamespacesBase extends AdminResource {
protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
+ checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
validatePoliciesReadOnlyAccess();
log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
@@ -1922,6 +1939,9 @@ public abstract class NamespacesBase extends AdminResource {
protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String tenant) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
+ checkNotNull(cluster, "Cluster should not be null");
+ checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
+ checkNotNull(tenant, "Tenant should not be null");
log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);
@@ -1952,24 +1972,21 @@ public abstract class NamespacesBase extends AdminResource {
}
private void validatePersistencePolicies(PersistencePolicies persistence) {
- try {
- checkNotNull(persistence);
- final ServiceConfiguration config = pulsar().getConfiguration();
- checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
- "Bookkeeper-Ensemble must be <= %s", config.getManagedLedgerMaxEnsembleSize());
- checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
- "Bookkeeper-WriteQuorum must be <= %s", config.getManagedLedgerMaxWriteQuorum());
- checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
- "Bookkeeper-AckQuorum must be <= %s", config.getManagedLedgerMaxAckQuorum());
- checkArgument(
- (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
- && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
- "Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
- persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
- persistence.getBookkeeperAckQuorum());
- } catch (NullPointerException | IllegalArgumentException e) {
- throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
- }
+ checkNotNull(persistence, "persistence policies should not be null");
+ final ServiceConfiguration config = pulsar().getConfiguration();
+ checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
+ "Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
+ checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
+ "Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
+ checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
+ "Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
+ checkArgument(
+ (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
+ && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
+ String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
+ persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
+ persistence.getBookkeeperAckQuorum()));
+
}
protected RetentionPolicies internalGetRetention() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index afa4fe7..3f350d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -1404,11 +1404,11 @@ public class PersistentTopicsBase extends AdminResource {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
- checkNotNull(repl);
+ Preconditions.checkNotNull(repl);
repl.skipMessages(numMessages).get();
} else {
PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ Preconditions.checkNotNull(sub);
sub.skipMessages(numMessages).get();
}
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, topicName, subName);
@@ -1798,7 +1798,7 @@ public class PersistentTopicsBase extends AdminResource {
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ Preconditions.checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
@@ -1907,7 +1907,7 @@ public class PersistentTopicsBase extends AdminResource {
}
private Response generateResponseWithEntry(Entry entry) throws IOException {
- checkNotNull(entry);
+ Preconditions.checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -2151,11 +2151,11 @@ public class PersistentTopicsBase extends AdminResource {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
- checkNotNull(repl);
+ Preconditions.checkNotNull(repl);
repl.expireMessages(expireTimeInSeconds);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
- checkNotNull(sub);
+ Preconditions.checkNotNull(sub);
sub.expireMessages(expireTimeInSeconds);
}
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName,
@@ -2374,7 +2374,7 @@ public class PersistentTopicsBase extends AdminResource {
private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
try {
Subscription sub = topic.getSubscription(subName);
- return checkNotNull(sub);
+ return Preconditions.checkNotNull(sub);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
@@ -2387,7 +2387,7 @@ public class PersistentTopicsBase extends AdminResource {
try {
String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
- return checkNotNull(repl);
+ return Preconditions.checkNotNull(repl);
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 4c8289b..360346f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -301,19 +301,19 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
- assertEquals(e.getStatusCode(), 412);
+ assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
- assertEquals(e.getStatusCode(), 412);
+ assertEquals(e.getStatusCode(), 400);
}
try {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
fail("should have failed");
} catch (PulsarAdminException e) {
- assertEquals(e.getStatusCode(), 412);
+ assertEquals(e.getStatusCode(), 400);
}
// make sure policies has not been changed