You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2023/02/16 10:30:57 UTC
[pulsar] branch branch-2.10 updated: [fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b970f029d91 [fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)
b970f029d91 is described below
commit b970f029d9184c4b57a6604f304b9c5f32744394
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Feb 16 12:30:50 2023 +0200
[fix][broker][branch-2.10] Replace sync method call in async call chain to prevent ZK event thread deadlock (#19539)
---
.../broker/admin/impl/PersistentTopicsBase.java | 132 +++++++++++----------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 16 +++
2 files changed, 84 insertions(+), 64 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1ed27d4a757..b287f9f629b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2414,78 +2414,82 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> ret;
- if (topicName.isGlobal()) {
- ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ // If the topic name is a partition name, no need to get partition topic metadata again
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ log.warn("[{}] Not supported operation on partitioned-topic {} {}",
+ clientAppId(), topicName, subName);
+ throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Reset-cursor at position is not allowed for partitioned-topic"));
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
} else {
ret = CompletableFuture.completedFuture(null);
}
+ if (topicName.isGlobal()) {
+ ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
+ }
ret.thenAccept(__ -> {
log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);
- // If the topic name is a partition name, no need to get partition topic metadata again
- if (!topicName.isPartitioned()
- && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
- log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
- subName);
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
- "Reset-cursor at position is not allowed for partitioned-topic"));
- return;
- } else {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(ignore ->
- validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
- .thenCompose(ignore -> getTopicReferenceAsync(topicName))
- .thenAccept(topic -> {
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
- return;
- }
- PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
- return;
- }
- CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
- getEntryBatchSize(batchSizeFuture, (PersistentTopic) topic, messageId, batchIndex);
- batchSizeFuture.thenAccept(bi -> {
- PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex,
- messageId);
- sub.resetCursor(seekPosition).thenRun(() -> {
- log.info("[{}][{}] successfully reset cursor on subscription {}"
- + " to position {}", clientAppId(),
- topicName, subName, messageId);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
- log.warn("[{}][{}] Failed to reset cursor on subscription {}"
- + " to position {}", clientAppId(),
- topicName, subName, messageId, t);
- if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for position specified: "
- + t.getMessage()));
- } else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " + t.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
- }
- return null;
- });
- }).exceptionally(e -> {
- asyncResponse.resume(e);
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(ignore ->
+ validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+ .thenAccept(topic -> {
+ if (topic == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ return;
+ }
+ PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
+ if (sub == null) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
+ CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
+ getEntryBatchSize(batchSizeFuture, (PersistentTopic) topic, messageId, batchIndex);
+ batchSizeFuture.thenAccept(bi -> {
+ PositionImpl seekPosition = calculatePositionAckSet(isExcluded, bi, batchIndex,
+ messageId);
+ sub.resetCursor(seekPosition).thenRun(() -> {
+ log.info("[{}][{}] successfully reset cursor on subscription {}"
+ + " to position {}", clientAppId(),
+ topicName, subName, messageId);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
+ log.warn("[{}][{}] Failed to reset cursor on subscription {}"
+ + " to position {}", clientAppId(),
+ topicName, subName, messageId, t);
+ if (t instanceof SubscriptionInvalidCursorPosition) {
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Unable to find position for position specified: "
+ + t.getMessage()));
+ } else if (t instanceof SubscriptionBusyException) {
+ asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage()));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ }
return null;
});
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
- clientAppId(), topicName, subName, messageId, ex.getCause());
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
- return null;
- });
- }
+ }).exceptionally(e -> {
+ asyncResponse.resume(e);
+ return null;
+ });
+ }).exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.warn("[{}][{}] Failed to reset cursor on subscription {} to position {}",
+ clientAppId(), topicName, subName, messageId, ex.getCause());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+ return null;
+ });
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 7b6e2176bed..fcc2f144907 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -609,6 +609,22 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
consumer.close();
}
+ @Test
+ public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
+ final String partitionedTopicName = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("parttopic");
+ admin.topics().createPartitionedTopic(partitionedTopicName, 4);
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+ try {
+ admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
+ fail();
+ } catch (PulsarAdminException.NotAllowedException e) {
+ assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
+ "Condition doesn't match. Actual message:" + e.getMessage());
+ }
+ }
+
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)