You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/03 06:32:23 UTC

[GitHub] [pulsar] AnonHxy opened a new pull request, #16355: [improve][broker][PIP-149]Make resetCursor async

AnonHxy opened a new pull request, #16355:
URL: https://github.com/apache/pulsar/pull/16355

   Master Issue: https://github.com/apache/pulsar/issues/14365
   
   ### Motivation
   
   
   * See https://github.com/apache/pulsar/issues/14365
   
   ### Modifications
   
   * Make resetCursor async
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
     
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r912606550


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2036,29 +2036,31 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
         });
     }
 
-    protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
+    protected CompletableFuture<Void> internalResetCursorAsync(String subName, long timestamp,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());

Review Comment:
   Fixed. PTAL~ @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r916513586


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,86 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
-                        if (ex != null) {
-                            if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
-                            }
-                        }
-
+                    return future.whenComplete((r, ex) -> {
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
-                }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() -> log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                clientAppId(), topicName, subName, timestamp))
+            .exceptionally(ex -> {
                 Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
                 log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
+                    subName, timestamp, t);
                 if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
                 } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
                 } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                    throw new RestException(t);
                 }

Review Comment:
   Make sense. Moving RestException to REST layer looks a little better.  @Technoboy- @mattisonchao 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914580254


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })

Review Comment:
   Missing `if (topic == null) {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1173366838

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r913437091


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() -> log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                clientAppId(), topicName, subName, timestamp))
+            .exceptionally(ex -> {
                 Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
                 log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
+                    subName, timestamp, t);
                 if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
                 } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
                 } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                    throw new RestException(t);
                 }
-                return null;
             });
-        } catch (Exception e) {
-            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
-                    clientAppId(), topicName, subName, timestamp, e);
-            if (e instanceof NotAllowedException) {
-                asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));

Review Comment:
   This exception handling seems missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914587851


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }

Review Comment:
   The above `if` block seems no need anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914650812


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }

Review Comment:
   Good catch! Updated, PTAL again @Technoboy- 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1176152097

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r916411680


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r912599441


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2036,29 +2036,31 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
         });
     }
 
-    protected void internalResetCursor(AsyncResponse asyncResponse, String subName, long timestamp,
+    protected CompletableFuture<Void> internalResetCursorAsync(String subName, long timestamp,
             boolean authoritative) {
+        CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
-            try {
-                validateGlobalNamespaceOwnership(namespaceName);
-            } catch (Exception e) {
-                log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}: {}",
-                        clientAppId(), topicName,
-                        subName, timestamp, e.getMessage());

Review Comment:
   Please don't missed the warning logs in the new change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914589966


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);

Review Comment:
   `topic` can not be null, so I remove the `topic  == null` check. Because `getTopicReferenceAsync` will complete exceptionally if `topic` is null, L4249:
   https://github.com/apache/pulsar/blob/fd60c3e9bbc9299f9a8f0c682baeb4ca9a490b58/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L4245-L4250  @Technoboy- 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914580786


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);

Review Comment:
   Missing `if (topic == null) {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1173107909

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r916413142


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,86 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
-                        if (ex != null) {
-                            if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
-                            } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
-                            }
-                        }
-
+                    return future.whenComplete((r, ex) -> {
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
-                }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() -> log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                clientAppId(), topicName, subName, timestamp))
+            .exceptionally(ex -> {
                 Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
                 log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
+                    subName, timestamp, t);
                 if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
                 } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
                 } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                    throw new RestException(t);
                 }

Review Comment:
   I think we should move line 2142~2152 to Rest Layer. @mattisonchao What do you think ?
   @AnonHxy 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1193552122

   @nodece We haven't cherry-picked any PRs of PIP-149 since there are lots of conflicts that might introduce new problems to the release branch. If the issue can be reproduced by pulsarctl, we'd better push a hot fix to branch-2.10 directly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1193572454

   @codelipenghui Thanks for your explanation! There has a waiting thread, you can see this log:
   
   ```
       reset_cursor_test.go:64: code: 500 reason: 
            --- An unexpected error occurred in the server ---
           Message: null
           Stacktrace:
           java.util.concurrent.TimeoutException
           	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
           	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
           	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getTopicReference(PersistentTopicsBase.java:4085)
           	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalResetCursorForNonPartitionedTopic(PersistentTopicsBase.java:2135)
           	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalResetCursor$156(PersistentTopicsBase.java:2112)
           	at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
           	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
           	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
           	at org.apache.pulsar.metadata.impl.ZKMetadataStore.handleGetResult(ZKMetadataStore.java:233)
           	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$6(ZKMetadataStore.java:183)
           	at org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:490)
           	at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:722)
           	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:563)
   ```
   
   I'll make a PR to branch-2.10 for fix this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16355:
URL: https://github.com/apache/pulsar/pull/16355


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1173123285

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914650812


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#issuecomment-1176160993

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r913473696


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })
+            .thenCompose(topic -> {
+                Subscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    throw new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                }
+                return sub.resetCursor(timestamp);
+            })
+            .thenRun(() -> log.info("[{}][{}] Reset cursor on subscription {} to time {}",
+                clientAppId(), topicName, subName, timestamp))
+            .exceptionally(ex -> {
                 Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
                 log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), topicName,
-                        subName, timestamp, t);
+                    subName, timestamp, t);
                 if (t instanceof SubscriptionInvalidCursorPosition) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Unable to find position for timestamp specified: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for timestamp specified: " + t.getMessage());
                 } else if (t instanceof SubscriptionBusyException) {
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Failed for Subscription Busy: " + t.getMessage()));
+                    throw new RestException(Status.PRECONDITION_FAILED,
+                        "Failed for Subscription Busy: " + t.getMessage());
                 } else {
-                    resumeAsyncResponseExceptionally(asyncResponse, t);
+                    throw new RestException(t);
                 }
-                return null;
             });
-        } catch (Exception e) {
-            log.warn("[{}][{}] Failed to reset cursor on subscription {} to time {}",
-                    clientAppId(), topicName, subName, timestamp, e);
-            if (e instanceof NotAllowedException) {
-                asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage()));

Review Comment:
   `resumeAsyncResponseExceptionally` has already handled the `NotAllowedException` exception, which invoked at 
   `v1/PersistentTopics` and `v2/PersistentTopics`. @Jason918 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org