You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/27 15:47:10 UTC

[pulsar] branch master updated: [issue #3895] Bugfix for non partitioned topic create (#3910)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee7b6a4  [issue #3895] Bugfix for non partitioned topic create (#3910)
ee7b6a4 is described below

commit ee7b6a44b0f9866a2e66c7b2f848b74804044c07
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Wed Mar 27 12:47:02 2019 -0300

    [issue #3895] Bugfix for non partitioned topic create (#3910)
    
    **Motivation**
    
    Provide a different approach from #3902 trying to fix #3895.
    
    **Modifications**
    
      - Add unit test in order to exploit issue.
      - Do proper validations for non topic create.
      - Add missing validateTopicName() method on create non partitioned topic.
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java     | 14 ++++++++------
 .../apache/pulsar/broker/admin/v2/PersistentTopics.java    |  1 +
 .../apache/pulsar/broker/admin/PersistentTopicsTest.java   | 10 ++++++++++
 3 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2890b0d..ce32890 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -40,11 +40,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
@@ -68,7 +66,6 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -399,11 +396,16 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
-    	validateAdminAccessForTenant(topicName.getTenant());
+        validateAdminAccessForTenant(topicName.getTenant());
+
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
 
+        validateTopicOwnership(topicName, authoritative);
     	try {
-    		getOrCreateTopic(topicName);
-    		log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
+            Topic createdTopic = getOrCreateTopic(topicName);
+            log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
     	} catch (Exception e) {
     		log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
     		throw new RestException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a9c9369..ca9ff88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -153,6 +153,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic, 
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateGlobalNamespaceOwnership(tenant,namespace);
+        validateTopicName(tenant, namespace, encodedTopic);
         internalCreateNonPartitionedTopic(authoritative);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fdefd46..8311ec8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
@@ -130,4 +131,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     	persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
     	Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true).partitions, 0);
     }
+
+    @Test
+    public void testCreateNonPartitionedTopic() {
+        final String topicName = "standard-topic";
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
+        PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
+                testTenant, testNamespace, topicName, true);
+        Assert.assertEquals(pMetadata.partitions, 0);
+    }
 }