You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/13 09:33:58 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]Make resetCursor async (#16355)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea417fb494e [improve][broker][PIP-149]Make resetCursor async (#16355)
ea417fb494e is described below
commit ea417fb494e23637beaaa796f515b12b4982959d
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jul 13 17:33:51 2022 +0800
[improve][broker][PIP-149]Make resetCursor async (#16355)
---
.../broker/admin/impl/PersistentTopicsBase.java | 177 ++++++++-------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 27 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 27 +++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 1 +
4 files changed, 102 insertions(+), 130 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 529294ef437..3782cf1ab43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2036,29 +2036,31 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
+ protected CompletableFuture<Void> internalResetCursorAsync(String subName, long timestamp,
boolean authoritative) {
+ CompletableFuture<Void> future;
if (topicName.isGlobal()) {
- try {
- validateGlobalNamespaceOwnership(namespaceName);
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
- clientAppId(), topicName,
- subName, timestamp, e.getMessage());
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
}
+ return future
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ // If the topic name is a partition name, no need to get partition topic metadata again
+ if (topicName.isPartitioned()) {
+ return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
+ } else {
+ return internalResetCursorForPartitionedTopic(subName, timestamp, authoritative);
+ }
+ });
+ }
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
- // If the topic name is a partition name, no need to get partition topic metadata again
- if (topicName.isPartitioned()) {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
- } else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAccept(partitionMetadata -> {
+ private CompletableFuture<Void> internalResetCursorForPartitionedTopic(String subName, long timestamp,
+ boolean authoritative) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenCompose(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -2070,126 +2072,73 @@ public class PersistentTopicsBase extends AdminResource {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
- .resetCursorAsync(topicNamePartition.toString(),
- subName, timestamp).handle((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PreconditionFailedException) {
- // throw the last exception if all partitions get this error
- // any other exception on partition is reported back to user
- failureCount.incrementAndGet();
- partitionException.set(ex);
- } else {
- log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+ .resetCursorAsync(topicNamePartition.toString(),
+ subName, timestamp).handle((r, ex) -> {
+ if (ex != null) {
+ if (ex instanceof PreconditionFailedException) {
+ // throw the last exception if all partitions get this error
+ // any other exception on partition is reported back to user
+ failureCount.incrementAndGet();
+ partitionException.set(ex);
+ } else {
+ log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
clientAppId(), topicNamePartition, subName, timestamp, ex);
- future.completeExceptionally(ex);
- return null;
+ future.completeExceptionally(ex);
+ return null;
+ }
}
- }
- if (count.decrementAndGet() == 0) {
- future.complete(null);
- }
+ if (count.decrementAndGet() == 0) {
+ future.complete(null);
+ }
- return null;
- });
+ return null;
+ });
} catch (Exception e) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
- topicNamePartition, subName, timestamp, e);
+ topicNamePartition, subName, timestamp, e);
future.completeExceptionally(e);
}
}
- future.whenComplete((r, ex) -> {
- if (ex != null) {
- if (ex instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) ex));
- return;
- } else {
- asyncResponse.resume(new RestException(ex));
- return;
- }
- }
-
+ return future.whenComplete((r, ex) -> {
// report an error to user if unable to reset for all partitions
if (failureCount.get() == numPartitions) {
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
- clientAppId(), topicName,
- subName, timestamp, partitionException.get());
- asyncResponse.resume(
- new RestException(Status.PRECONDITION_FAILED,
- partitionException.get().getMessage()));
- return;
+ clientAppId(), topicName,
+ subName, timestamp, partitionException.get());
+ throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
} else if (failureCount.get() > 0) {
log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
- clientAppId(), topicName, subName, timestamp, partitionException.get());
+ clientAppId(), topicName, subName, timestamp, partitionException.get());
}
-
- asyncResponse.resume(Response.noContent().build());
});
} else {
- internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+ return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to expire messages for all subscription on topic {}",
- clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
- }
}
- private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+ private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
boolean authoritative) {
- try {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
- log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+ .thenCompose(__ -> {
+ log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
clientAppId(), topicName, subName, timestamp);
-
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getTopicNotFoundErrorMessage(topicName.toString())));
- return;
- }
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getSubNotFoundErrorMessage(topicName.toString(), subName)));
- return;
- }
- sub.resetCursor(timestamp).thenRun(() -> {
- log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
- timestamp);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
- subName, timestamp, t);
- if (t instanceof SubscriptionInvalidCursorPosition) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Unable to find position for timestamp specified: " + t.getMessage()));
- } else if (t instanceof SubscriptionBusyException) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Failed for Subscription Busy: " + t.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, t);
+ return getTopicReferenceAsync(topicName);
+ })
+ .thenCompose(topic -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
}
- return null;
- });
- } catch (Exception e) {
- log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
- clientAppId(), topicName, subName, timestamp, e);
- if (e instanceof NotAllowedException) {
- asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
- } else {
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
- }
+ return sub.resetCursor(timestamp);
+ })
+ .thenRun(() ->
+ log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, subName, timestamp));
}
protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 5e5c4ab0d5f..51a0cb1598d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -696,14 +696,25 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@PathParam("timestamp") long timestamp,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName), timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp, t);
+ }
+ if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " + t.getMessage());
+ } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index edf49d410ad..28dc8015ff8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1589,14 +1589,25 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("timestamp") long timestamp,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(tenant, namespace, encodedTopic);
- internalResetCursor(asyncResponse, decode(encodedSubName), timestamp, authoritative);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+ clientAppId(), topicName, encodedSubName, timestamp, t);
+ }
+ if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Unable to find position for timestamp specified: " + t.getMessage());
+ } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
+ t = new RestException(Response.Status.PRECONDITION_FAILED,
+ "Failed for Subscription Busy: " + t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@PUT
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fdda0bc5745..9fe28655594 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1445,6 +1445,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn(brokerService).when(pulsar).getBrokerService();
CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+ completableFuture.completeExceptionally(new RuntimeException("TimeoutException"));
try {
admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
Assert.fail();