You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/27 11:47:29 UTC

[pulsar] 03/05: fix non-persistent topic get partitioned metadata error on discovery (#10806)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2a1078a5cbd4ff2e48c89ecea9e55b6a8f8229b1
Author: Aloys <lo...@gmail.com>
AuthorDate: Fri Jun 18 14:09:06 2021 +0800

    fix non-persistent topic get partitioned metadata error on discovery (#10806)
    
    Fixes #10443
    
    fix non-persistent topic get partitioned metadata error if using discovery
    
    (cherry picked from commit 859922942759aaa539fe7b0951a614bb75c71ea8)
---
 .../discovery/service/BrokerDiscoveryProvider.java  |  2 +-
 .../discovery/service/BaseDiscoveryTestSetup.java   |  8 ++++++++
 .../discovery/service/DiscoveryServiceTest.java     | 21 +++++++++++++++++++++
 3 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 414d2ce..41374f2 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -108,7 +108,7 @@ public class BrokerDiscoveryProvider implements Closeable {
         try {
             checkAuthorization(service, topicName, role, authenticationData);
             final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
-                    topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
+                    topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName());
             // gets the number of partitions from the zk cache
             globalZkCache
                     .getDataAsync(path,
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
index 3c7b3ff..074c8fb 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
@@ -32,9 +32,11 @@ import org.apache.pulsar.discovery.service.server.ServiceConfig;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
 
 
 public class BaseDiscoveryTestSetup {
@@ -85,4 +87,10 @@ public class BaseDiscoveryTestSetup {
         }
     };
 
+    protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) {
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+            return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2");
+        });
+    }
 }
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index b30c66d..6498ee8 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -122,6 +122,27 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
         }
     }
 
+    @Test
+    public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception {
+        TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1");
+
+        PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
+                .get();
+        assertEquals(m.partitions, 0);
+
+        // Simulate ZK error
+        simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED);
+        TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2");
+        CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
+                .getPartitionedTopicMetadata(service, topic2, "role", null);
+        try {
+            future.get();
+            fail("Partition metadata lookup should have failed");
+        } catch (ExecutionException e) {
+            assertEquals(e.getCause().getClass(), MetadataStoreException.class);
+        }
+    }
+
     /**
      * It verifies: client connects to Discovery-service and receives discovery response successfully.
      *