You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/14 16:08:40 UTC

[pulsar] branch master updated: Fix TopicListWatcher creation (#16576)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bdf4948dd7 Fix TopicListWatcher creation (#16576)
9bdf4948dd7 is described below

commit 9bdf4948dd733e699d52e01549ac935929058915
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Thu Jul 14 18:08:33 2022 +0200

    Fix TopicListWatcher creation (#16576)
    
    Motivation
    
    TopicListWatcher's can not work when the lookup url scheme is HTTP.
    
    Changes
    
    - Not creating a TopicListWatcher when HTTP url is specified
    - Create test to avoid future regressions in this area
---
 .../client/api/PatternMultiTopicsConsumerTest.java | 98 ++++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  6 ++
 .../pulsar/client/impl/PulsarClientImpl.java       |  4 +
 .../pulsar/client/impl/TopicListWatcher.java       |  2 +-
 4 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
new file mode 100644
index 00000000000..a78b804b9b1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternMultiTopicsConsumerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker")
+public class PatternMultiTopicsConsumerTest extends ProducerConsumerBase {
+
+    private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerTest.class);
+
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        isTcpLookup = true;
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 5000)
+    public void testSimple() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+                // Make sure topics are discovered before test times out
+                .patternAutoDiscoveryPeriod(2, TimeUnit.SECONDS)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("subscriber-name").subscribe();
+        testWithConsumer(consumer);
+    }
+
+    @Test(timeOut = 5000)
+    public void testNotifications() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/topic.*")
+                // Set auto-discovery period high so that only notifications can inform us about new topics
+                .patternAutoDiscoveryPeriod(1, TimeUnit.MINUTES)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("subscriber-name").subscribe();
+        testWithConsumer(consumer);
+    }
+
+    private void testWithConsumer(Consumer<byte[]> consumer) throws Exception {
+        Map<String, List<String>> sentMessages = Maps.newHashMap();
+        for (int p = 0; p < 10; ++p) {
+            String name = "persistent://my-property/my-ns/topic-" + p;
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(name).create();
+            for (int i = 0; i < 10; i++) {
+                String message = "message-" + p + i;
+                producer.send(message.getBytes());
+                sentMessages.computeIfAbsent(name, topic -> new LinkedList<>()).add(message);
+            }
+        }
+
+        Message<byte[]> msg;
+        Map<String, List<String>> receivedMessages = Maps.newHashMap();
+        for (int i = 0; i < 100; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            receivedMessages.computeIfAbsent(msg.getTopicName(), topic -> new LinkedList<>()).add(receivedMessage);
+        }
+
+        Assert.assertEquals(receivedMessages, sentMessages);
+        consumer.close();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 37464dacfd4..06105adbbb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1064,6 +1064,12 @@ public class ClientCnx extends PulsarHandler {
                 RequestType.Command, true);
     }
 
+    public CompletableFuture<CommandSuccess> newWatchTopicListClose(
+            BaseCommand commandWatchTopicListClose, long requestId) {
+        return sendRequestAndHandleTimeout(
+                Commands.serializeWithSize(commandWatchTopicListClose), requestId, RequestType.Command, true);
+    }
+
     protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) {
         checkArgument(state == State.Ready);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a5ee6596834..c436f129789 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -947,6 +947,10 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
+        if (!(lookup instanceof BinaryProtoLookupService)) {
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
+                    "Can't get client connection to HTTP service URL", null));
+        }
         InetSocketAddress address = lookup.resolveHost();
         return getConnection(address, address);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 9cd6b003d7d..d2ca018aef9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -211,7 +211,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
             cleanupAtClose(closeFuture, null);
         } else {
             BaseCommand cmd = Commands.newWatchTopicListClose(watcherId, requestId);
-            cnx.sendRequestWithId(Commands.serializeWithSize(cmd), requestId).handle((v, exception) -> {
+            cnx.newWatchTopicListClose(cmd, requestId).handle((v, exception) -> {
                 final ChannelHandlerContext ctx = cnx.ctx();
                 boolean ignoreException = ctx == null || !ctx.channel().isActive();
                 if (ignoreException && exception != null) {