You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 06:23:26 UTC
[pulsar] branch master updated: [Issues 5709]remove the namespace
checking (#5716)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5011816 [Issues 5709]remove the namespace checking (#5716)
5011816 is described below
commit 50118168e225a4c6f1803af1b7bb13bde211b619
Author: huangdx0726 <lo...@foxmail.com>
AuthorDate: Thu Jun 4 14:23:12 2020 +0800
[Issues 5709]remove the namespace checking (#5716)
[Issues 5709] remove the namespace checking Support multiple topic subscriptions across multiple namespace
Fixes #5709
### Motivation
Support multiple topic subscriptions across multiple namespace
### Modifications
remove the namespace checking
---
.../pulsar/client/impl/TopicsConsumerImplTest.java | 98 +++++++++++++++++-----
.../client/impl/MultiTopicsConsumerImpl.java | 78 ++++++-----------
.../impl/PatternMultiTopicsConsumerImpl.java | 5 +-
.../client/impl/MultiTopicsConsumerImplTest.java | 1 -
4 files changed, 102 insertions(+), 80 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 5fd2d5b..06cae44 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -18,32 +18,12 @@
*/
package org.apache.pulsar.client.impl;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.netty.util.Timeout;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
-
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -66,6 +46,26 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TopicsConsumerImplTest extends ProducerConsumerBase {
private static final long testTimeout = 90000; // 1.5 min
@@ -102,13 +102,13 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
// 2. Create consumer
try {
- pulsarClient.newConsumer()
+ Consumer consumer = pulsarClient.newConsumer()
.topics(topicNames)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();
- fail("subscribe for topics from different namespace should fail.");
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
} catch (IllegalArgumentException e) {
// expected for have different namespace
}
@@ -1023,4 +1023,56 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
producer3.close();
}
+ @Test(timeOut = testTimeout)
+ public void multiTopicsInDifferentNameSpace() throws PulsarAdminException, PulsarClientException {
+ List<String> topics = new ArrayList<>();
+ topics.add("persistent://prop/use/ns-abc/topic-1");
+ topics.add("persistent://prop/use/ns-abc/topic-2");
+ topics.add("persistent://prop/use/ns-abc1/topic-3");
+ admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("prop", new TenantInfo(null, Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("prop/use/ns-abc");
+ admin.namespaces().createNamespace("prop/use/ns-abc1");
+ Consumer consumer = pulsarClient.newConsumer()
+ .topics(topics)
+ .subscriptionName("multiTopicSubscription")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ // create Producer
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://prop/use/ns-abc/topic-1")
+ .producerName("producer")
+ .create();
+ Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://prop/use/ns-abc/topic-2")
+ .producerName("producer1")
+ .create();
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://prop/use/ns-abc1/topic-3")
+ .producerName("producer2")
+ .create();
+ //send message
+ producer.send("ns-abc/topic-1-Message1");
+
+ producer1.send("ns-abc/topic-2-Message1");
+
+ producer2.send("ns-abc1/topic-3-Message1");
+
+ int messageSet = 0;
+ Message<byte[]> message = consumer.receive();
+ do {
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.info("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(200, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, 3);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer.close();
+ producer1.close();
+ producer2.close();
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f3147ce..e078d70 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -18,17 +18,32 @@
*/
package org.apache.pulsar.client.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
-
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -49,33 +64,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerStats;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Messages;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.apache.pulsar.client.util.ConsumerName;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
- // All topics should be in same namespace
- protected NamespaceName namespaceName;
-
// Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
@@ -146,15 +141,12 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
if (conf.getTopicNames().isEmpty()) {
- this.namespaceName = null;
setState(State.Ready);
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
return;
}
- checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics should have same namespace.");
- this.namespaceName = conf.getTopicNames().stream().findFirst()
- .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
+ checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics is empty or invalid.");
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
.collect(Collectors.toList());
@@ -179,28 +171,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// Check topics are valid.
// - each topic is valid,
- // - every topic has same namespace,
// - topic names are unique.
private static boolean topicNamesValid(Collection<String> topics) {
checkState(topics != null && topics.size() >= 1,
"topics should contain more than 1 topic");
- final String namespace = TopicName.get(topics.stream().findFirst().get()).getNamespace();
-
Optional<String> result = topics.stream()
- .filter(topic -> {
- boolean topicInvalid = !TopicName.isValid(topic);
- if (topicInvalid) {
- return true;
- }
-
- String newNamespace = TopicName.get(topic).getNamespace();
- if (!namespace.equals(newNamespace)) {
- return true;
- } else {
- return false;
- }
- }).findFirst();
+ .filter(topic -> !TopicName.isValid(topic))
+ .findFirst();
if (result.isPresent()) {
log.warn("Received invalid topic name: {}", result.get());
@@ -746,11 +724,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);
- if (this.namespaceName != null) {
- checkArgument(TopicName.get(topicName).getNamespace().equals(this.namespaceName.toString()),
- "Topic " + topicName + " not in same namespace with Topics");
- }
-
return true;
}
@@ -913,9 +886,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
subscribeResult.complete(null);
log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get());
- if (this.namespaceName == null) {
- this.namespaceName = TopicName.get(topicName).getNamespaceObject();
- }
return;
})
.exceptionally(ex -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 3475830..91b994a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -46,7 +46,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private final Mode subscriptionMode;
- private volatile Timeout recheckPatternTimeout;
+ protected NamespaceName namespaceName;
+ private volatile Timeout recheckPatternTimeout = null;
public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
PulsarClientImpl client,
@@ -105,7 +106,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
// schedule the next re-check task
this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
- Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
+ Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
}
public Pattern getPattern() {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index bc7c264..dfd28ce 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -61,5 +61,4 @@ public class MultiTopicsConsumerImplTest {
impl.getStats();
}
-
}