You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/02/16 10:30:57 UTC

[pulsar] branch branch-2.10 updated: [fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b970f029d91 [fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)
b970f029d91 is described below

commit b970f029d9184c4b57a6604f304b9c5f32744394
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Feb 16 12:30:50 2023 +0200

    [fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 132 +++++++++++----------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  16 +++
 2 files changed, 84 insertions(+), 64 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1ed27d4a757..b287f9f629b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2414,78 +2414,82 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
             MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
         CompletableFuture<Void> ret;
-        if (topicName.isGlobal()) {
-            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        // If the topic name is a partition name, no need to get partition topic metadata again
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            log.warn("[{}] Not supported operation on partitioned-topic {} {}",
+                                    clientAppId(), topicName, subName);
+                            throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Reset-cursor at position is not allowed for partitioned-topic"));
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
         } else {
             ret = CompletableFuture.completedFuture(null);
         }
+        if (topicName.isGlobal()) {
+            ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
+        }
         ret.thenAccept(__ -> {
             log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
                     subName, messageId);
-            // If the topic name is a partition name, no need to get partition topic metadata again
-            if (!topicName.isPartitioned()
-                    && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
-                log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
-                        subName);
-                asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
-                        "Reset-cursor at position is not allowed for partitioned-topic"));
-                return;
-            } else {
-                validateTopicOwnershipAsync(topicName, authoritative)
-                        .thenCompose(ignore ->
-                                validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
-                        .thenCompose(ignore -> getTopicReferenceAsync(topicName))
-                        .thenAccept(topic -> {
-                                if (topic == null) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
-                                    return;
-                                }
-                                PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
-                                if (sub == null) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
-                                    return;
-                                }
-                                CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
-                                getEntryBatchSize(batchSizeFuture, (PersistentTopic) topic, messageId, batchIndex);
-                                batchSizeFuture.thenAccept(bi -> {
-                                    PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex,
-                                            messageId);
-                                    sub.resetCursor(seekPosition).thenRun(() -> {
-                                        log.info("[{}][{}] successfully reset cursor on subscription {}"
-                                                        + " to position {}", clientAppId(),
-                                                topicName, subName, messageId);
-                                        asyncResponse.resume(Response.noContent().build());
-                                    }).exceptionally(ex -> {
-                                        Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
-                                        log.warn("[{}][{}] Failed to reset cursor on subscription {}"
-                                                        + " to position {}", clientAppId(),
-                                                        topicName, subName, messageId, t);
-                                        if (t instanceof SubscriptionInvalidCursorPosition) {
-                                            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                                                    "Unable to find position for position specified: "
-                                                            + t.getMessage()));
-                                        } else if (t instanceof SubscriptionBusyException) {
-                                            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                                                    "Failed for Subscription Busy: " + t.getMessage()));
-                                        } else {
-                                            resumeAsyncResponseExceptionally(asyncResponse, t);
-                                        }
-                                        return null;
-                                    });
-                                }).exceptionally(e -> {
-                                    asyncResponse.resume(e);
+            validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(ignore ->
+                            validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+                    .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+                    .thenAccept(topic -> {
+                            if (topic == null) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                return;
+                            }
+                            PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
+                            if (sub == null) {
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                return;
+                            }
+                            CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
+                            getEntryBatchSize(batchSizeFuture, (PersistentTopic) topic, messageId, batchIndex);
+                            batchSizeFuture.thenAccept(bi -> {
+                                PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex,
+                                        messageId);
+                                sub.resetCursor(seekPosition).thenRun(() -> {
+                                    log.info("[{}][{}] successfully reset cursor on subscription {}"
+                                                    + " to position {}", clientAppId(),
+                                            topicName, subName, messageId);
+                                    asyncResponse.resume(Response.noContent().build());
+                                }).exceptionally(ex -> {
+                                    Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
+                                    log.warn("[{}][{}] Failed to reset cursor on subscription {}"
+                                                    + " to position {}", clientAppId(),
+                                                    topicName, subName, messageId, t);
+                                    if (t instanceof SubscriptionInvalidCursorPosition) {
+                                        asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                                "Unable to find position for position specified: "
+                                                        + t.getMessage()));
+                                    } else if (t instanceof SubscriptionBusyException) {
+                                        asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                                                "Failed for Subscription Busy: " + t.getMessage()));
+                                    } else {
+                                        resumeAsyncResponseExceptionally(asyncResponse, t);
+                                    }
                                     return null;
                                 });
-                        }).exceptionally(ex -> {
-                            // If the exception is not redirect exception we need to log it.
-                            if (!isRedirectException(ex)) {
-                                log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
-                                        clientAppId(), topicName, subName, messageId, ex.getCause());
-                            }
-                            resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
-                            return null;
-                        });
-                }
+                            }).exceptionally(e -> {
+                                asyncResponse.resume(e);
+                                return null;
+                            });
+                    }).exceptionally(ex -> {
+                        // If the exception is not redirect exception we need to log it.
+                        if (!isRedirectException(ex)) {
+                            log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
+                                    clientAppId(), topicName, subName, messageId, ex.getCause());
+                        }
+                        resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                        return null;
+                    });
         }).exceptionally(ex -> {
             // If the exception is not redirect exception we need to log it.
             if (!isRedirectException(ex)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 7b6e2176bed..fcc2f144907 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -609,6 +609,22 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         consumer.close();
     }
 
+    @Test
+    public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
+        final String partitionedTopicName = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("parttopic");
+        admin.topics().createPartitionedTopic(partitionedTopicName, 4);
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        try {
+            admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
+            fail();
+        } catch (PulsarAdminException.NotAllowedException e) {
+            assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
+                    "Condition doesn't match. Actual message:" + e.getMessage());
+        }
+    }
+
     private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer()
             .topic(topicName)