You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/27 11:47:29 UTC
[pulsar] 03/05: fix non-persistent topic get partitioned metadata
error on discovery (#10806)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2a1078a5cbd4ff2e48c89ecea9e55b6a8f8229b1
Author: Aloys <lo...@gmail.com>
AuthorDate: Fri Jun 18 14:09:06 2021 +0800
fix non-persistent topic get partitioned metadata error on discovery (#10806)
Fixes #10443
fix non-persistent topic get partitioned metadata error if using discovery
(cherry picked from commit 859922942759aaa539fe7b0951a614bb75c71ea8)
---
.../discovery/service/BrokerDiscoveryProvider.java | 2 +-
.../discovery/service/BaseDiscoveryTestSetup.java | 8 ++++++++
.../discovery/service/DiscoveryServiceTest.java | 21 +++++++++++++++++++++
3 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 414d2ce..41374f2 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -108,7 +108,7 @@ public class BrokerDiscoveryProvider implements Closeable {
try {
checkAuthorization(service, topicName, role, authenticationData);
final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
- topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
+ topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName());
// gets the number of partitions from the zk cache
globalZkCache
.getDataAsync(path,
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
index 3c7b3ff..074c8fb 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
@@ -32,9 +32,11 @@ import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
public class BaseDiscoveryTestSetup {
@@ -85,4 +87,10 @@ public class BaseDiscoveryTestSetup {
}
};
+ protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) {
+ mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+ return op == MockZooKeeper.Op.GET
+ && path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2");
+ });
+ }
}
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index b30c66d..6498ee8 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -122,6 +122,27 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
}
}
+ @Test
+ public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception {
+ TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1");
+
+ PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
+ .get();
+ assertEquals(m.partitions, 0);
+
+ // Simulate ZK error
+ simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED);
+ TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2");
+ CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
+ .getPartitionedTopicMetadata(service, topic2, "role", null);
+ try {
+ future.get();
+ fail("Partition metadata lookup should have failed");
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getClass(), MetadataStoreException.class);
+ }
+ }
+
/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*