You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/13 09:33:58 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]Make resetCursor async (#16355)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea417fb494e [improve][broker][PIP-149]Make resetCursor async (#16355)
ea417fb494e is described below

commit ea417fb494e23637beaaa796f515b12b4982959d
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jul 13 17:33:51 2022 +0800

    [improve][broker][PIP-149]Make resetCursor async (#16355)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 177 ++++++++-------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  27 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  27 +++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 +
 4 files changed, 102 insertions(+), 130 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 529294ef437..3782cf1ab43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2036,29 +2036,31 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
+    protected CompletableFuture<Void> internalResetCursorAsync(String subName, long timestamp,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-                return;
-            }
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
         }
+        return future
+            .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                // If the topic name is a partition name, no need to get partition topic metadata again
+                if (topicName.isPartitioned()) {
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
+                } else {
+                    return internalResetCursorForPartitionedTopic(subName, timestamp, authoritative);
+                }
+            });
+    }
 
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-        // If the topic name is a partition name, no need to get partition topic metadata again
-        if (topicName.isPartitioned()) {
-            internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-        } else {
-            getPartitionedTopicMetadataAsync(topicName,
-                    authoritative, false).thenAccept(partitionMetadata -> {
+    private CompletableFuture<Void> internalResetCursorForPartitionedTopic(String subName, long timestamp,
+                                                                           boolean authoritative) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+            .thenCompose(partitionMetadata -> {
                 final int numPartitions = partitionMetadata.partitions;
                 if (numPartitions > 0) {
                     final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -2070,126 +2072,73 @@ public class PersistentTopicsBase extends AdminResource {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
-                        if (ex != null) {
-                            if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
-                            }
-                        }
-
+                    return future.whenComplete((r, ex) -> {
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
-                }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
-                Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
-                if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: " + t.getMessage()));
-                } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + t.getMessage()));
-                } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName));
                 }
-                return null;
-            });
-        } catch (Exception e) {
-            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
-                    clientAppId(), topicName, subName, timestamp, e);
-            if (e instanceof NotAllowedException) {
-                asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));
-            } else {
-                resumeAsyncResponseExceptionally(asyncResponse, e);
-            }
-        }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() ->
+                log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                    clientAppId(), topicName, subName, timestamp));
     }
 
     protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 5e5c4ab0d5f..51a0cb1598d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -696,14 +696,25 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @PathParam("timestamp") long timestamp,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        try {
-            validateTopicName(property, cluster, namespace, encodedTopic);
-            internalResetCursor(asyncResponse, decode(encodedSubName), timestamp, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
+            .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                if (!isRedirectException(t)) {
+                    log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+                        clientAppId(), topicName, encodedSubName, timestamp, t);
+                }
+                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
+                } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+                return null;
+            });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index edf49d410ad..28dc8015ff8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1589,14 +1589,25 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("timestamp") long timestamp,
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        try {
-            validateTopicName(tenant, namespace, encodedTopic);
-            internalResetCursor(asyncResponse, decode(encodedSubName), timestamp, authoritative);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalResetCursorAsync(decode(encodedSubName), timestamp, authoritative)
+            .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                if (!isRedirectException(t)) {
+                    log.error("[{}][{}] Failed to reset cursor on subscription {} to time {}",
+                        clientAppId(), topicName, encodedSubName, timestamp, t);
+                }
+                if (t instanceof BrokerServiceException.SubscriptionInvalidCursorPosition) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
+                } else if (t instanceof BrokerServiceException.SubscriptionBusyException) {
+                    t = new RestException(Response.Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+                return null;
+            });
     }
 
     @PUT
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fdda0bc5745..9fe28655594 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1445,6 +1445,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(brokerService).when(pulsar).getBrokerService();
         CompletableFuture<Optional<Topic>> completableFuture = new CompletableFuture<>();
         doReturn(completableFuture).when(brokerService).getTopicIfExists(topic);
+        completableFuture.completeExceptionally(new RuntimeException("TimeoutException"));
         try {
             admin.topics().resetCursor(topic, "my-sub", System.currentTimeMillis());
             Assert.fail();