You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/06 08:45:04 UTC

[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16355: [improve][broker][PIP-149]Make resetCursor async

Technoboy- commented on code in PR #16355:
URL: https://github.com/apache/pulsar/pull/16355#discussion_r914580254


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2070,126 +2072,94 @@ protected void internalResetCursor(AsyncResponse asyncResponse, String subName,
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
                             pulsar().getAdminClient().topics()
-                                    .resetCursorAsync(topicNamePartition.toString(),
-                                            subName, timestamp).handle((r, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof PreconditionFailedException) {
-                                        // throw the last exception if all partitions get this error
-                                        // any other exception on partition is reported back to user
-                                        failureCount.incrementAndGet();
-                                        partitionException.set(ex);
-                                    } else {
-                                        log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
+                                .resetCursorAsync(topicNamePartition.toString(),
+                                    subName, timestamp).handle((r, ex) -> {
+                                    if (ex != null) {
+                                        if (ex instanceof PreconditionFailedException) {
+                                            // throw the last exception if all partitions get this error
+                                            // any other exception on partition is reported back to user
+                                            failureCount.incrementAndGet();
+                                            partitionException.set(ex);
+                                        } else {
+                                            log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
                                                 clientAppId(), topicNamePartition, subName, timestamp, ex);
-                                        future.completeExceptionally(ex);
-                                        return null;
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        }
                                     }
-                                }
 
-                                if (count.decrementAndGet() == 0) {
-                                    future.complete(null);
-                                }
+                                    if (count.decrementAndGet() == 0) {
+                                        future.complete(null);
+                                    }
 
-                                return null;
-                            });
+                                    return null;
+                                });
                         } catch (Exception e) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(),
-                                    topicNamePartition, subName, timestamp, e);
+                                topicNamePartition, subName, timestamp, e);
                             future.completeExceptionally(e);
                         }
                     }
 
-                    future.whenComplete((r, ex) -> {
+                    return future.whenComplete((r, ex) -> {
                         if (ex != null) {
                             if (ex instanceof PulsarAdminException) {
-                                asyncResponse.resume(new RestException((PulsarAdminException) ex));
-                                return;
+                                throw new RestException((PulsarAdminException) ex);
                             } else {
-                                asyncResponse.resume(new RestException(ex));
-                                return;
+                                throw new RestException(ex);
                             }
                         }
 
                         // report an error to user if unable to reset for all partitions
                         if (failureCount.get() == numPartitions) {
                             log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName,
-                                    subName, timestamp, partitionException.get());
-                            asyncResponse.resume(
-                                    new RestException(Status.PRECONDITION_FAILED,
-                                            partitionException.get().getMessage()));
-                            return;
+                                clientAppId(), topicName,
+                                subName, timestamp, partitionException.get());
+                            throw new RestException(Status.PRECONDITION_FAILED, partitionException.get().getMessage());
                         } else if (failureCount.get() > 0) {
                             log.warn("[{}] [{}] Partial errors for reset cursor on subscription {} to time {}",
-                                    clientAppId(), topicName, subName, timestamp, partitionException.get());
+                                clientAppId(), topicName, subName, timestamp, partitionException.get());
                         }
-
-                        asyncResponse.resume(Response.noContent().build());
                     });
                 } else {
-                    internalResetCursorForNonPartitionedTopic(asyncResponse, subName, timestamp, authoritative);
-                }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to expire messages for all subscription on topic {}",
-                            clientAppId(), topicName, ex);
+                    return internalResetCursorForNonPartitionedTopic(subName, timestamp, authoritative);
                 }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
-        }
     }
 
-    private void internalResetCursorForNonPartitionedTopic(AsyncResponse asyncResponse, String subName, long timestamp,
+    private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String subName, long timestamp,
                                        boolean authoritative) {
-        try {
-            validateTopicOwnership(topicName, authoritative);
-            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, subName);
-
-            log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.RESET_CURSOR, subName))
+            .thenCompose(__ -> {
+                log.info("[{}] [{}] Received reset cursor on subscription {} to time {}",
                     clientAppId(), topicName, subName, timestamp);
-
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getTopicNotFoundErrorMessage(topicName.toString())));
-                return;
-            }
-            PersistentSubscription sub = topic.getSubscription(subName);
-            if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                return;
-            }
-            sub.resetCursor(timestamp).thenRun(() -> {
-                log.info("[{}][{}] Reset cursor on subscription {} to time {}", clientAppId(), topicName, subName,
-                        timestamp);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
+                return getTopicReferenceAsync(topicName);
+            })

Review Comment:
   Missing `if (topic == null) {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org