You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/24 03:46:32 UTC

[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15682: [improve][txn] Add a admin tool to check message pending ack stats

gaoran10 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r905681190


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -342,4 +343,47 @@ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    @Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")

Review Comment:
   ```suggestion
       @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}")
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -342,4 +343,47 @@ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    @Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse asyncResponse,
+                                             @QueryParam("authoritative")
+                                             @DefaultValue("false") boolean authoritative,
+                                             @PathParam("tenant") String tenant,
+                                             @PathParam("namespace") String namespace,
+                                             @PathParam("topic") @Encoded String encodedTopic,
+                                             @PathParam("subName") String subName,
+                                             @QueryParam("ledgerId") Long ledgerId,

Review Comment:
   Maybe we could add the params `ledgerId` and `entryId` in the path, it's more rest style, and we don't need to validate them. 



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -613,6 +619,117 @@ public void testUpdateTransactionCoordinatorNumber() throws Exception {
         }
     }
 
+
+    @Test
+    public void testCheckPositionInPendingAckState() throws Exception {
+         String topic = "persistent://public/default/test";
+         String subName = "sub";
+         initTransaction(1);
+         Transaction transaction = pulsarClient.newTransaction()
+                 .withTransactionTimeout(5, TimeUnit.SECONDS)
+                 .build()
+                 .get();
+
+         @Cleanup
+         Producer<byte[]> producer = pulsarClient.newProducer()
+                 .sendTimeout(5, TimeUnit.SECONDS)
+                 .enableBatching(false)
+                 .topic(topic)
+                 .create();
+         @Cleanup
+         Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                 .topic(topic)
+                 .subscriptionName(subName)
+                 .subscribe();
+
+         producer.newMessage().send();
+
+         Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+         MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
+
+         PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+                 messageId.getLedgerId(), messageId.getEntryId(), null);
+        assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);

Review Comment:
   ```suggestion
            assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org