You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/07 11:33:31 UTC

[GitHub] [pulsar] gaozhangmin opened a new pull request #10853: [Issue #9488][add admin api getBacklogSize]

gaozhangmin opened a new pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853


   Fixes #9488 
   
   ### Motivation
   
   Add ability to calculate  backlog size given a message id.
   ### Modifications
   
   In Admin Rest API and CLI add option to getBacklogSizeByMessageId
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API: (yes / no) yes (added option to get topic's backlog size  given a message id)
     - The schema: (yes / no / don't know) no
     - The default values of configurations: (yes / no) no
     - The wire protocol: (yes / no) no 
     - The rest endpoints: (yes / no) no
     - The admin cli options: (yes / no)yes
     - Anything that affects deployment: (yes / no / don't know)no
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no) yes
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-862128713


   @MarvinCai  Can u please take a look!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-859264423


   @cckellogg  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648836317



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
##########
@@ -1578,6 +1578,41 @@ public void failed(Throwable throwable) {
         }
     }
 
+    @Override
+    public Long getBacklogSizeByMessageId(String topic, String messageId, boolean checkLedgerExists)

Review comment:
       Can I use a PUT method here? If MessageId object used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-878904111


   my apology, didn't see all the @, I'll take a look tonight


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648836062



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,33 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
        For partitioned-topic, We should return an error if ledger not exists on it. 
    But for  multi-partitions topic, We return an error only when ledger not exists on all partitions.  When calculate partitioned-topic  backlog size of this multi-partitions topic, We should't throw an error if if ledger not exists on partitioned-topic. So We need checkLedgerExists flag to judge different situations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-879536980


   > I don't think this command make sense to partitioned topic.
   
   I agree with you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r647306197



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,35 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")
+    @ApiOperation(value = "Calculate backlog size given a messageId.")

Review comment:
       ```suggestion
       @ApiOperation(value = "Calculate backlog size by a message ID (in bytes).")
   ```
   
   Do you mean this?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id

Review comment:
       ```suggestion
        *            Ledger ID
   ```

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id
+     * @param entryId
+     *            Entry id

Review comment:
       ```suggestion
        *            Entry ID
   ```

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.

Review comment:
       same

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -2279,4 +2279,27 @@ void run() throws PulsarAdminException {
     private Topics getTopics() {
         return getAdmin().topics();
     }
+
+    @Parameters(commandDescription = "Calculate backlog size given a messageId.")

Review comment:
       same

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,37 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")
+    @ApiOperation(value = "Calculate backlog size given a messageId.")

Review comment:
       same

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id
+     * @param entryId
+     *            Entry id
+     * @param checkLedgerExists
+     *            checkLedgerExists
+     * @return the backlog size from
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    Long getBacklogSizeByMessageId(String topic, long ledgerId, long entryId, boolean checkLedgerExists) throws PulsarAdminException;
+
+    /**
+     * Get backlog size by its messageId Async.

Review comment:
       ```suggestion
        * Get backlog size by a message ID asynchronously.
   ```

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id
+     * @param entryId
+     *            Entry id
+     * @param checkLedgerExists
+     *            checkLedgerExists
+     * @return the backlog size from
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    Long getBacklogSizeByMessageId(String topic, long ledgerId, long entryId, boolean checkLedgerExists) throws PulsarAdminException;
+
+    /**
+     * Get backlog size by its messageId Async.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id
+     * @param entryId
+     *            Entry id

Review comment:
       same

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -2279,4 +2279,27 @@ void run() throws PulsarAdminException {
     private Topics getTopics() {
         return getAdmin().topics();
     }
+
+    @Parameters(commandDescription = "Calculate backlog size given a messageId.")
+    private class GetBacklogSizeByMessageId extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-l", "--ledgerId" },
+                description = "ledger id pointing to the desired ledger",

Review comment:
       ID

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -2279,4 +2279,27 @@ void run() throws PulsarAdminException {
     private Topics getTopics() {
         return getAdmin().topics();
     }
+
+    @Parameters(commandDescription = "Calculate backlog size given a messageId.")
+    private class GetBacklogSizeByMessageId extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-l", "--ledgerId" },
+                description = "ledger id pointing to the desired ledger",
+                required = true)
+        private long ledgerId;
+
+        @Parameter(names = { "-e", "--entryId" },
+                description = "entry id pointing to the desired entry",

Review comment:
       ID




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648834526



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,33 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       For multi-partitions topic, We cannot return error if messageId not exists on one partitioned-topic, instead, We should calculate size of ledger larger than messageId's ledger id




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r647178512



##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.

Review comment:
       yes, it's backlog size int bytes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin removed a comment on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin removed a comment on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-879630968


   @MarvinCai  Could u please review it again? thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-926670909


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin removed a comment on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin removed a comment on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-879630968


   @MarvinCai  Could u please review it again? thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-901244616


   @codelipenghui  @cckellogg @MarvinCai   this pr has been continued for a long time. PTAL. thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648836062



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,33 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
        For partitioned-topic, We should return an error if ledger not exists on it. 
    But for  multi-partitions topic, We return an error only when ledger not exists on all partitions. We should include  those partitions which ledger not exists into calculate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648836062



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,33 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
        For partitioned-topic, We should return an error if ledger not exists on it. 
    But for  multi-partitions topic, We return an error when ledger not exists on all partitions. We should include  those partitions which ledger not exists into calculate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-878744802


   Anyone can review? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-879544212


   I will change this api, only support a non-partitioned topic or a specific topic partition


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-863780618


   @MarvinCai Can you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-880601705


   @MarvinCai  Please review this pr since  I had changed, Only support a non-partitioned topic or a specific topic partition


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r669241047



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - sizeBeforePosLedger);
+                    } else {
+                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = jsonMapper().readTree(internalInfo).get("partitions");

Review comment:
       No, ```topicName.isPartitioned()``` means the topic is with partition, like 'topicname-partition-0'




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-861242673


   @cckellogg  Please take a look, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-926635209


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r669244681



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - sizeBeforePosLedger);
+                    } else {
+                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {

Review comment:
       maybe the method name is confused, what mean here is for single partition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-879630968


   @MarvinCai  Could u please review it again? thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-901239396


    Please review this pr since I had changed, Only support a non-partitioned topic or a specific topic partition @MarvinCai 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cckellogg commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648504885



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
##########
@@ -1578,6 +1578,41 @@ public void failed(Throwable throwable) {
         }
     }
 
+    @Override
+    public Long getBacklogSizeByMessageId(String topic, String messageId, boolean checkLedgerExists)

Review comment:
       There is a `MessageId` object we should use that instead of a string

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1458,6 +1458,33 @@ public PersistentOfflineTopicStats getBacklog(
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       This API is confusing. Why is a message ID required? If it's not provides can it default to the latest?
   
   This `checkLedgerExists/{checkLedgerExists}` should not be part of the path in my opinion. If you need this `checkLedgerExists` it should be passed as a query parameter. 
   
   I still don't understand why `checkLedgerExists` is needed. Can't the response return an error if it does not exist?
   
   

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,32 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       Do we need to support V1 for new apis like this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r647187187



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,35 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       for  calculating backlog size across partitions,  Firstly, We had checked if the target ledgerId existed on topics across partitions, We  should not check If the ledger exists on partitioned topic lately, because target ledger is only exists on one partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] cckellogg commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r646956591



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,35 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")
+    @ApiOperation(value = "Calculate backlog size given a messageId.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),

Review comment:
       This should be not authenticated 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,35 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       To me this is a very complicated api. Where is a user suppose to get ledger Id and entry Id? Can just a message Id work? what is the purpose of this `checkLedgerExists`?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -26,6 +26,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;

Review comment:
       There is already a jsonMapper `jsonMapper()` no need to add this dependency.

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.
+     * @param topic
+     *            Topic name
+     * @param ledgerId
+     *            Ledger id
+     * @param entryId
+     *            Entry id
+     * @param checkLedgerExists
+     *            checkLedgerExists
+     * @return the backlog size from
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    Long getBacklogSizeByMessageId(String topic, long ledgerId, long entryId, boolean checkLedgerExists) throws PulsarAdminException;

Review comment:
       Why not pass a messageID? Same for below.

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1596,6 +1596,36 @@ void createSubscription(String topic, String subscriptionName, MessageId message
     Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
             throws PulsarAdminException;
 
+    /**
+     * Get backlog size by its messageId.

Review comment:
       What is backlog here? The number of messages or the size in bytes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#issuecomment-901239396






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r647939062



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,35 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}/checkLedgerExists/{checkLedgerExists}/backlogSize")
+    @ApiOperation(value = "Calculate backlog size given a messageId.")

Review comment:
       yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r668760864



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1,35 +1,20 @@
-/**

Review comment:
       we need to keep the Apache license header

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -1229,7 +1214,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
-            boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
+                                               boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {

Review comment:
       please remove unnecessary whitespace changes.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {

Review comment:
       can simply return 0 in this case.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - sizeBeforePosLedger);
+                    } else {
+                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = jsonMapper().readTree(internalInfo).get("partitions");

Review comment:
       seems we're processing partitioned topic here, should be` !topicName.isPartitioned()` in if clause?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {

Review comment:
       what we process here is just ledger info in mledger, synchronizing a PersistentTopicsBase instance won't work in this case. also what data this synchronized block is trying to guard?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - sizeBeforePosLedger);
+                    } else {
+                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {
+            internalGetBacklogSizeForNonPartitionedTopic(asyncResponse, messageId, authoritative);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Long>> futures = Lists.newArrayList();
+                    List<Long> backlogSizeAccrossPartitions = new ArrayList<>();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = topicName.getPartition(i);
+                        try {
+                            futures.add(pulsar().getAdminClient().topics()
+                                    .getBacklogSizeByMessageIdAsync(topicNamePartition.toString(), messageId, false).whenComplete((backlogSize, throwable) -> {

Review comment:
       shouldn't hardcode false for `checkLedgerExists`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2615,8 +2600,147 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)
         return quotaMap;
     }
 
+    private void internalGetBacklogSizeForNonPartitionedTopic(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) {
+        // will redirect if the topic not owned by current broker
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_BACKLOG_SIZE);
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        PositionImpl pos = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
+
+        try {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger();
+            synchronized (this) {
+                if(messageId.getLedgerId() == -1) {
+                    asyncResponse.resume(managedLedger.getTotalSize());
+                } else {
+                    NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = managedLedger.getLedgersInfo();
+                    MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
+                    if (ledgerInfo == null) {
+                        long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
+                                .mapToLong(MLDataFormats.ManagedLedgerInfo.LedgerInfo::getSize).sum();
+                        long currentLedgerId = ledgers.lastKey();
+                        if (currentLedgerId < pos.getLedgerId()) {
+                            sizeBeforePosLedger += managedLedger.getCurrentLedgerSize();
+                        }
+                        asyncResponse.resume(managedLedger.getTotalSize() - sizeBeforePosLedger);
+                    } else {
+                        asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos));
+                    }
+                }
+
+            }
+        } catch (WebApplicationException wae) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to get backlog size for non-partitioned topic {}, redirecting to other brokers.",
+                        clientAppId(), topicName, wae);
+            }
+            resumeAsyncResponseExceptionally(asyncResponse, wae);
+        } catch (Exception e) {
+            log.error("[{}] Failed to get backlog size for non-partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+
+    private boolean isLedgerExists(MessageIdImpl messageId){
+        try {
+            String internalInfo = pulsar().getAdminClient().topics().getInternalInfo(topicName.toString());
+            boolean ledgerExists = false;
+            if(messageId.getLedgerId() == -1) {
+                ledgerExists = true;
+            } else {
+                if (topicName.isPartitioned()) {
+                    ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(internalInfo, ManagedLedgerInfo.class);
+                    ledgerExists = managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                } else {
+                    JsonNode partitionedInternalInfos = jsonMapper().readTree(internalInfo).get("partitions");
+                    for (Iterator<Map.Entry<String, JsonNode>> it = partitionedInternalInfos.fields(); it.hasNext(); ) {
+                        String partitionedInternalInfo = it.next().getValue().toString();
+                        ManagedLedgerInfo managedLedgerInfo = jsonMapper().readValue(partitionedInternalInfo, ManagedLedgerInfo.class);
+                        ledgerExists = ledgerExists || managedLedgerInfo.ledgers.stream().anyMatch(ledgerInfo -> messageId.getLedgerId() == ledgerInfo.ledgerId);
+                    }
+                }
+            }
+            return ledgerExists;
+        }  catch (Exception e) {
+            log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative, boolean checkLedgerExists) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.error("[{}] Failed to get backlog size for {}", clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        if(checkLedgerExists) {
+            boolean ledgerExists = isLedgerExists(messageId);
+            if (!ledgerExists) {
+                log.warn("[{}] ledger {} not exists on topic {}", messageId.getLedgerId(), clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, String.format("ledger %s not exists", messageId.getLedgerId())));
+                return;
+            }
+        }
+
+        if (topicName.isPartitioned()) {

Review comment:
       same here, why invoking method for non-partitioned topic when `topicName.isPartitioned() `is `true`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaozhangmin commented on a change in pull request #10853: [Issue #9488][add admin api getBacklogSize]

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on a change in pull request #10853:
URL: https://github.com/apache/pulsar/pull/10853#discussion_r648836139



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -662,6 +662,32 @@ public PersistentOfflineTopicStats getBacklog(@PathParam("property") String prop
         return internalGetBacklog(authoritative);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{topic}/messageId/{messageId}/checkLedgerExists/{checkLedgerExists}/backlogSize")

Review comment:
       delete V1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org