You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/02 07:07:00 UTC

[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15682: [improve][txn] Add a admin tool to check message pending ack stats

congbobo184 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r887626615


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    @Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse asyncResponse,
+                                               @QueryParam("authoritative")
+                                               @DefaultValue("false") boolean authoritative,
+                                               @PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String namespace,
+                                               @PathParam("topic") @Encoded String encodedTopic,
+                                               @PathParam("subName") String subName,
+                                               @QueryParam("position") String position) {
+        try {
+            checkArgument(position != null, "Message position should not be null.");
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            PositionImpl positionImpl = PositionImpl.parsePositionFromString(position);
+            internalGetPositionStatsPendingAckStats(authoritative, subName, positionImpl)
+                    .thenAccept(positionInPendingAckStats -> asyncResponse.resume(positionInPendingAckStats))
+                    .exceptionally(ex -> {
+                        log.warn("{} Failed to check position [{}] stats for topic [{}], subscription [{}]",
+                                clientAppId(), position, topicName, subName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            log.warn("Failed to get position stats in pending ack");
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
+
+
+

Review Comment:
   delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    @Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse asyncResponse,

Review Comment:
   checkPositionInPendingAckState



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java:
##########
@@ -317,10 +318,10 @@ public CompletableFuture<TransactionPendingAckInternalStats> getPendingAckIntern
         path = path.queryParam("metadata", metadata);
         final CompletableFuture<TransactionPendingAckInternalStats> future = new CompletableFuture<>();
         asyncGetRequest(path,
-                new InvocationCallback<TransactionPendingAckInternalStats>() {
+                new InvocationCallback<PositionInPendingAckStats>() {

Review Comment:
   why change this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org