You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/12 03:57:35 UTC

[pulsar] branch master updated: Check for null arguments in Namespaces Rest API (#7247)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 134a8a2  Check for null arguments in Namespaces Rest API (#7247)
134a8a2 is described below

commit 134a8a2e2064a895e9a70bab2ba4c01510fb349e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Jun 11 20:57:20 2020 -0700

    Check for null arguments in Namespaces Rest API (#7247)
    
    * Check for null arguments
    
    * Fix test
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 +++++-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 57 ++++++++++++++--------
 .../broker/admin/impl/PersistentTopicsBase.java    | 18 +++----
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |  6 +--
 4 files changed, 62 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1d765f5..7d4b6d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.common.util.Codec.decode;
 
@@ -397,7 +396,7 @@ public abstract class AdminResource extends PulsarWebResource {
         if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
                 && !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
             String[] parts = broker.split(":");
-            checkArgument(parts.length == 2, "Invalid broker url %s", broker);
+            checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
             String host = parts[0];
             int port = Integer.parseInt(parts[1]);
 
@@ -844,4 +843,16 @@ public abstract class AdminResource extends PulsarWebResource {
             asyncResponse.resume(new RestException(throwable));
         }
     }
+
+    protected void checkNotNull(Object o, String errorMessage) {
+        if (o == null) {
+            throw new RestException(Status.BAD_REQUEST, errorMessage);
+        }
+    }
+
+    protected void checkArgument(boolean b, String errorMessage) {
+        if (!b) {
+            throw new RestException(Status.BAD_REQUEST, errorMessage);
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 42fd278..b117c9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -110,6 +108,7 @@ public abstract class NamespacesBase extends AdminResource {
     private static final long MAX_BUNDLES = ((long) 1) << 32;
 
     protected List<String> internalGetTenantNamespaces(String tenant) {
+        checkNotNull(tenant, "Tenant should not be null");
         validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
 
         try {
@@ -380,6 +379,8 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
+        checkNotNull(role, "Role should not be null");
+        checkNotNull(actions, "Actions should not be null");
 
         try {
             AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -411,6 +412,8 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
+        checkNotNull(subscription, "Subscription should not be null");
+        checkNotNull(roles, "Roles should not be null");
 
         try {
             AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
@@ -442,6 +445,7 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalRevokePermissionsOnNamespace(String role) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
         validatePoliciesReadOnlyAccess();
+        checkNotNull(role, "Role should not be null");
 
         try {
             Stat nodeStat = new Stat();
@@ -472,6 +476,8 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalRevokePermissionsOnSubscription(String subscriptionName, String role) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
         validatePoliciesReadOnlyAccess();
+        checkNotNull(subscriptionName, "SubscriptionName should not be null");
+        checkNotNull(role, "Role should not be null");
 
         AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
         if (null != authService) {
@@ -497,6 +503,7 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalSetNamespaceReplicationClusters(List<String> clusterIds) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
+        checkNotNull(clusterIds, "ClusterIds should not be null");
 
         Set<String> replicationClusterSet = Sets.newHashSet(clusterIds);
         if (!namespaceName.isGlobal()) {
@@ -1021,6 +1028,7 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
         validateSuperUserAccess();
+        checkNotNull(bundleRange, "BundleRange should not be null");
         log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
 
         Policies policies = getNamespacePolicies(namespaceName);
@@ -1069,6 +1077,7 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
         validateSuperUserAccess();
+        checkNotNull(bundleRange, "BundleRange should not be null");
         log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
 
         Policies policies = getNamespacePolicies(namespaceName);
@@ -1581,6 +1590,7 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(bundleRange, "BundleRange should not be null");
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1602,6 +1612,7 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
             boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(subscription, "Subscription should not be null");
 
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
@@ -1645,6 +1656,8 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
             boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
+        checkNotNull(subscription, "Subscription should not be null");
+        checkNotNull(bundleRange, "BundleRange should not be null");
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1666,6 +1679,7 @@ public abstract class NamespacesBase extends AdminResource {
     protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
             boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
+        checkNotNull(subscription, "Subscription should not be null");
 
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         try {
@@ -1708,6 +1722,8 @@ public abstract class NamespacesBase extends AdminResource {
     @SuppressWarnings("deprecation")
     protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
+        checkNotNull(subscription, "Subscription should not be null");
+        checkNotNull(bundleRange, "BundleRange should not be null");
 
         Policies policies = getNamespacePolicies(namespaceName);
 
@@ -1845,6 +1861,7 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
+        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
         validatePoliciesReadOnlyAccess();
 
         log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);
@@ -1922,6 +1939,9 @@ public abstract class NamespacesBase extends AdminResource {
     protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
             String tenant) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.READ);
+        checkNotNull(cluster, "Cluster should not be null");
+        checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
+        checkNotNull(tenant, "Tenant should not be null");
 
         log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), tenant, antiAffinityGroup, cluster);
 
@@ -1952,24 +1972,21 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     private void validatePersistencePolicies(PersistencePolicies persistence) {
-        try {
-            checkNotNull(persistence);
-            final ServiceConfiguration config = pulsar().getConfiguration();
-            checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
-                    "Bookkeeper-Ensemble must be <= %s", config.getManagedLedgerMaxEnsembleSize());
-            checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
-                    "Bookkeeper-WriteQuorum must be <= %s", config.getManagedLedgerMaxWriteQuorum());
-            checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
-                    "Bookkeeper-AckQuorum must be <= %s", config.getManagedLedgerMaxAckQuorum());
-            checkArgument(
-                    (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
-                            && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
-                    "Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
-                    persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
-                    persistence.getBookkeeperAckQuorum());
-        } catch (NullPointerException | IllegalArgumentException e) {
-            throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
-        }
+        checkNotNull(persistence, "persistence policies should not be null");
+        final ServiceConfiguration config = pulsar().getConfiguration();
+        checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
+                "Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
+        checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
+                "Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
+        checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
+                "Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
+        checkArgument(
+                (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
+                        && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
+                String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
+                persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
+                persistence.getBookkeeperAckQuorum()));
+
     }
 
     protected RetentionPolicies internalGetRetention() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index afa4fe7..3f350d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.zafarkhaja.semver.Version;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1404,11 +1404,11 @@ public class PersistentTopicsBase extends AdminResource {
             if (subName.startsWith(topic.getReplicatorPrefix())) {
                 String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                 PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
+                Preconditions.checkNotNull(repl);
                 repl.skipMessages(numMessages).get();
             } else {
                 PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
+                Preconditions.checkNotNull(sub);
                 sub.skipMessages(numMessages).get();
             }
             log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, topicName, subName);
@@ -1798,7 +1798,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
             try {
                 PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
+                Preconditions.checkNotNull(sub);
                 sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
                 log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(),
                         topicName, subName, messageId);
@@ -1907,7 +1907,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private Response generateResponseWithEntry(Entry entry) throws IOException {
-        checkNotNull(entry);
+        Preconditions.checkNotNull(entry);
         PositionImpl pos = (PositionImpl) entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
@@ -2151,11 +2151,11 @@ public class PersistentTopicsBase extends AdminResource {
             if (subName.startsWith(topic.getReplicatorPrefix())) {
                 String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
                 PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
-                checkNotNull(repl);
+                Preconditions.checkNotNull(repl);
                 repl.expireMessages(expireTimeInSeconds);
             } else {
                 PersistentSubscription sub = topic.getSubscription(subName);
-                checkNotNull(sub);
+                Preconditions.checkNotNull(sub);
                 sub.expireMessages(expireTimeInSeconds);
             }
             log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName,
@@ -2374,7 +2374,7 @@ public class PersistentTopicsBase extends AdminResource {
     private Subscription getSubscriptionReference(String subName, PersistentTopic topic) {
         try {
             Subscription sub = topic.getSubscription(subName);
-            return checkNotNull(sub);
+            return Preconditions.checkNotNull(sub);
         } catch (Exception e) {
             throw new RestException(Status.NOT_FOUND, "Subscription not found");
         }
@@ -2387,7 +2387,7 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             String remoteCluster = PersistentReplicator.getRemoteCluster(replName);
             PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
-            return checkNotNull(repl);
+            return Preconditions.checkNotNull(repl);
         } catch (Exception e) {
             throw new RestException(Status.NOT_FOUND, "Replicator not found");
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index 4c8289b..360346f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -301,19 +301,19 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
             admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0));
             fail("should have failed");
         } catch (PulsarAdminException e) {
-            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getStatusCode(), 400);
         }
         try {
             admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0));
             fail("should have failed");
         } catch (PulsarAdminException e) {
-            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getStatusCode(), 400);
         }
         try {
             admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0));
             fail("should have failed");
         } catch (PulsarAdminException e) {
-            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getStatusCode(), 400);
         }
 
         // make sure policies has not been changed