You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/02 16:12:14 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

codelipenghui opened a new pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590


   After enabled geo-replication, we are not able to produce messages to a partitioned non-persistent topic.
   
   Reproduce step:
   
   1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster
   2. Create a non-persistent partitioned topic such as 10 partitions
   3. Use pulsar-perf to publish messages to the partitioned topic
   4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test`
   
   The root cause is there are 2 places calling a sync method in the async method callback.
   So the fix is:
   
   1. Async the `validatePartitionedTopicAsync` method
   2. Avoid call get cluster sync method when getting the replication client
   
   Integration tests added.
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     Bug fixes
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961693972






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961777418


   @eolivelli Thanks, @congbobo184 is helping on the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743561080



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       can you explain better this line ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -669,14 +672,20 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                 return;
             }
 
-            if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
-                getNamespaceReplicatedClusters(namespaceName)
-                        .stream()
-                        .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
-                        .forEach(cluster -> createFutureList.add(
-                                ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+            if (!replicatedClusters.isEmpty()) {
+                replicatedClusters.forEach(cluster -> {
+                    pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                            .thenAccept(clusterDataOp -> {
+                                ((TopicsImpl) pulsar().getBrokerService()
+                                        .getClusterPulsarAdmin(cluster, clusterDataOp).topics())
                                         .createPartitionedTopicAsync(
-                                                topicName.getPartitionedTopicName(), numPartitions, true)));
+                                                topicName.getPartitionedTopicName(), numPartitions, true);
+                            })
+                            .exceptionally(throwable -> {
+                                log.error("Failed to create partition topic in cluster {}.", cluster, throwable);

Review comment:
       should we fail the operation ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       should we run this after the execution of "topics.remove" ?

##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       I see.
   works for me, probably adding a comment will help future readers of this code

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       good, just checking

##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       can you explain better this line ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -669,14 +672,20 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                 return;
             }
 
-            if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
-                getNamespaceReplicatedClusters(namespaceName)
-                        .stream()
-                        .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
-                        .forEach(cluster -> createFutureList.add(
-                                ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+            if (!replicatedClusters.isEmpty()) {
+                replicatedClusters.forEach(cluster -> {
+                    pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                            .thenAccept(clusterDataOp -> {
+                                ((TopicsImpl) pulsar().getBrokerService()
+                                        .getClusterPulsarAdmin(cluster, clusterDataOp).topics())
                                         .createPartitionedTopicAsync(
-                                                topicName.getPartitionedTopicName(), numPartitions, true)));
+                                                topicName.getPartitionedTopicName(), numPartitions, true);
+                            })
+                            .exceptionally(throwable -> {
+                                log.error("Failed to create partition topic in cluster {}.", cluster, throwable);

Review comment:
       should we fail the operation ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       should we run this after the execution of "topics.remove" ?

##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       I see.
   works for me, probably adding a comment will help future readers of this code

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       good, just checking




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-957895414






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743615786



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       We may receive a notification from the metastore `/admin/policies/public`,  not a complete namespace path




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743577016



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       No effect before and after




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743660229



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       good, just checking




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-957895414


   @codelipenghui:Thanks for providing doc info!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961777418






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961693972


   @codelipenghui do you need some help here ?
   this is the unblocker for the 2.9.0 release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743615786



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       We may receive a notification from the metastore `/admin/policies/public`,  not a complete namespace path




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961799062


   @merlimat Could you please help review the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-957895414






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743659937



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       I see.
   works for me, probably adding a comment will help future readers of this code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-957895837


   @codelipenghui:Thanks for your contribution. For this PR, do we need to update docs?
   (The [PR template contains info about doc](https://github.com/apache/pulsar/blob/master/.github/PULL_REQUEST_TEMPLATE.md#documentation), which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743615786



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       We may receive a notification from the metastore `/admin/policies/public`,  not a complete namespace path




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743561080



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
##########
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function)
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");

Review comment:
       can you explain better this line ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -669,14 +672,20 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                 return;
             }
 
-            if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
-                getNamespaceReplicatedClusters(namespaceName)
-                        .stream()
-                        .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
-                        .forEach(cluster -> createFutureList.add(
-                                ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+            if (!replicatedClusters.isEmpty()) {
+                replicatedClusters.forEach(cluster -> {
+                    pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                            .thenAccept(clusterDataOp -> {
+                                ((TopicsImpl) pulsar().getBrokerService()
+                                        .getClusterPulsarAdmin(cluster, clusterDataOp).topics())
                                         .createPartitionedTopicAsync(
-                                                topicName.getPartitionedTopicName(), numPartitions, true)));
+                                                topicName.getPartitionedTopicName(), numPartitions, true);
+                            })
+                            .exceptionally(throwable -> {
+                                log.error("Failed to create partition topic in cluster {}.", cluster, throwable);

Review comment:
       should we fail the operation ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       should we run this after the execution of "topics.remove" ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#issuecomment-961777418






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743577016



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       No effect before and after




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590#discussion_r743577016



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1061,43 +1062,53 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);

Review comment:
       No effect before and after




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #12590: Fix call sync method in an async callback when enabling geo replicator.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #12590:
URL: https://github.com/apache/pulsar/pull/12590


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org