You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 06:23:26 UTC

[pulsar] branch master updated: [Issues 5709]remove the namespace checking (#5716)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5011816  [Issues 5709]remove the namespace checking (#5716)
5011816 is described below

commit 50118168e225a4c6f1803af1b7bb13bde211b619
Author: huangdx0726 <lo...@foxmail.com>
AuthorDate: Thu Jun 4 14:23:12 2020 +0800

    [Issues 5709]remove the namespace checking (#5716)
    
    [Issues 5709] remove the namespace checking  Support multiple topic subscriptions across multiple namespace
    
    Fixes #5709
    
    ### Motivation
    
    
    Support multiple topic subscriptions across multiple namespace
    
    ### Modifications
    remove the namespace checking
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 98 +++++++++++++++++-----
 .../client/impl/MultiTopicsConsumerImpl.java       | 78 ++++++-----------
 .../impl/PatternMultiTopicsConsumerImpl.java       |  5 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  1 -
 4 files changed, 102 insertions(+), 80 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 5fd2d5b..06cae44 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -18,32 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import io.netty.util.Timeout;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
-
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -66,6 +46,26 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TopicsConsumerImplTest extends ProducerConsumerBase {
     private static final long testTimeout = 90000; // 1.5 min
@@ -102,13 +102,13 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
 
         // 2. Create consumer
         try {
-            pulsarClient.newConsumer()
+            Consumer consumer = pulsarClient.newConsumer()
                 .topics(topicNames)
                 .subscriptionName(subscriptionName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
                 .subscribe();
-            fail("subscribe for topics from different namespace should fail.");
+            assertTrue(consumer instanceof MultiTopicsConsumerImpl);
         } catch (IllegalArgumentException e) {
             // expected for have different namespace
         }
@@ -1023,4 +1023,56 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         producer3.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void multiTopicsInDifferentNameSpace() throws PulsarAdminException, PulsarClientException {
+        List<String> topics = new ArrayList<>();
+        topics.add("persistent://prop/use/ns-abc/topic-1");
+        topics.add("persistent://prop/use/ns-abc/topic-2");
+        topics.add("persistent://prop/use/ns-abc1/topic-3");
+        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("prop", new TenantInfo(null, Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("prop/use/ns-abc");
+        admin.namespaces().createNamespace("prop/use/ns-abc1");
+        Consumer consumer = pulsarClient.newConsumer()
+                .topics(topics)
+                .subscriptionName("multiTopicSubscription")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+        // create Producer
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://prop/use/ns-abc/topic-1")
+                .producerName("producer")
+                .create();
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://prop/use/ns-abc/topic-2")
+                .producerName("producer1")
+                .create();
+        Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://prop/use/ns-abc1/topic-3")
+                .producerName("producer2")
+                .create();
+        //send message
+        producer.send("ns-abc/topic-1-Message1");
+
+        producer1.send("ns-abc/topic-2-Message1");
+
+        producer2.send("ns-abc1/topic-3-Message1");
+
+        int messageSet = 0;
+        Message<byte[]> message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer.close();
+        producer1.close();
+        producer2.close();
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f3147ce..e078d70 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -18,17 +18,32 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
-
 import com.google.common.collect.Queues;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,33 +64,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
 
-    // All topics should be in same namespace
-    protected NamespaceName namespaceName;
-
     // Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
     private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
@@ -146,15 +141,12 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
 
         if (conf.getTopicNames().isEmpty()) {
-            this.namespaceName = null;
             setState(State.Ready);
             subscribeFuture().complete(MultiTopicsConsumerImpl.this);
             return;
         }
 
-        checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics should have same namespace.");
-        this.namespaceName = conf.getTopicNames().stream().findFirst()
-                .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
+        checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics is empty or invalid.");
 
         List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
                 .collect(Collectors.toList());
@@ -179,28 +171,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     // Check topics are valid.
     // - each topic is valid,
-    // - every topic has same namespace,
     // - topic names are unique.
     private static boolean topicNamesValid(Collection<String> topics) {
         checkState(topics != null && topics.size() >= 1,
             "topics should contain more than 1 topic");
 
-        final String namespace = TopicName.get(topics.stream().findFirst().get()).getNamespace();
-
         Optional<String> result = topics.stream()
-            .filter(topic -> {
-                boolean topicInvalid = !TopicName.isValid(topic);
-                if (topicInvalid) {
-                    return true;
-                }
-
-                String newNamespace =  TopicName.get(topic).getNamespace();
-                if (!namespace.equals(newNamespace)) {
-                    return true;
-                } else {
-                    return false;
-                }
-            }).findFirst();
+                .filter(topic -> !TopicName.isValid(topic))
+                .findFirst();
 
         if (result.isPresent()) {
             log.warn("Received invalid topic name: {}", result.get());
@@ -746,11 +724,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
         checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);
 
-        if (this.namespaceName != null) {
-            checkArgument(TopicName.get(topicName).getNamespace().equals(this.namespaceName.toString()),
-                "Topic " + topicName + " not in same namespace with Topics");
-        }
-
         return true;
     }
 
@@ -913,9 +886,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 subscribeResult.complete(null);
                 log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
                     topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get());
-                if (this.namespaceName == null) {
-                    this.namespaceName = TopicName.get(topicName).getNamespaceObject();
-                }
                 return;
             })
             .exceptionally(ex -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 3475830..91b994a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -46,7 +46,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
     private final Pattern topicsPattern;
     private final TopicsChangedListener topicsChangeListener;
     private final Mode subscriptionMode;
-    private volatile Timeout recheckPatternTimeout;
+    protected NamespaceName namespaceName;
+    private volatile Timeout recheckPatternTimeout = null;
 
     public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
                                           PulsarClientImpl client,
@@ -105,7 +106,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
 
         // schedule the next re-check task
         this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
-            Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+                Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
     }
 
     public Pattern getPattern() {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index bc7c264..dfd28ce 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -61,5 +61,4 @@ public class MultiTopicsConsumerImplTest {
 
         impl.getStats();
     }
-
 }