You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/27 08:45:16 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #10725: [Transaction] Transaction admin api get buffer internal stats

congbobo184 opened a new pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725


   ## Motivation
   Transaction add admin api `getPendingAckInternalStats`
   ## implement
   ```
   /**
    * Transaction pending ack internal stats.
    */
   public class TransactionPendingAckInternalStats {
   
       /** The manage ledger internal stats*/
       public ManagedLedgerInternalStats managedLedgerInternalStats;
   }
   ```
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (yes)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10725: [Transaction] Transaction admin api get pending ack internal stats

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725#discussion_r640569018



##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -135,9 +137,51 @@
      *
      * @return the metadata of slow transactions.
      */
-    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
-                                                                            TimeUnit timeUnit) throws Exception;
+    Map<String, TransactionMetadata> getSlowTransactions(long timeout, TimeUnit timeUnit) throws PulsarAdminException;
 
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator ID
+     * @param metadata is get ledger metadata

Review comment:
       whether to get metadata




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10725: [Transaction] Transaction admin api get buffer internal stats

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725#discussion_r640440331



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -789,4 +796,69 @@ protected void validatePersistencePolicies(PersistencePolicies persistence) {
                         persistence.getBookkeeperAckQuorum()));
 
     }
+
+    protected CompletableFuture<ManagedLedgerInternalStats> getManageLedgerInternalStats(ManagedLedger ledger,
+                                                                                       boolean includeLedgerMetadata,
+                                                                                       String topic) {
+        CompletableFuture<ManagedLedgerInternalStats> statFuture = new CompletableFuture<>();
+        ManagedLedgerInternalStats stats = new ManagedLedgerInternalStats();
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) ledger;
+        stats.entriesAddedCounter = ml.getEntriesAddedCounter();
+        stats.numberOfEntries = ml.getNumberOfEntries();
+        stats.totalSize = ml.getTotalSize();
+        stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
+        stats.currentLedgerSize = ml.getCurrentLedgerSize();
+        stats.lastLedgerCreatedTimestamp = DateFormatter.format(ml.getLastLedgerCreatedTimestamp());
+        if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
+            stats.lastLedgerCreationFailureTimestamp = DateFormatter.format(ml.getLastLedgerCreationFailureTimestamp());
+        }
+
+        stats.waitingCursorsCount = ml.getWaitingCursorsCount();
+        stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();
+
+        stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
+        stats.state = ml.getState();
+
+        stats.ledgers = Lists.newArrayList();
+        ml.getLedgersInfo().forEach((id, li) -> {
+            ManagedLedgerInternalStats.LedgerInfo info = new PersistentTopicInternalStats.LedgerInfo();
+            info.ledgerId = li.getLedgerId();
+            info.entries = li.getEntries();
+            info.size = li.getSize();
+            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+            stats.ledgers.add(info);
+            if (includeLedgerMetadata) {
+                ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                    if (ex == null) {
+                        info.metadata = lMetadata;
+                    }
+                    return null;
+                });
+            }
+
+            stats.cursors = Maps.newTreeMap();
+            ml.getCursors().forEach(c -> {
+                ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+                PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
+                cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+                cs.readPosition = cursor.getReadPosition().toString();
+                cs.waitingReadOp = cursor.hasPendingReadRequest();
+                cs.pendingReadOps = cursor.getPendingReadOpsCount();
+                cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+                cs.cursorLedger = cursor.getCursorLedger();
+                cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+                cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
+                cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());

Review comment:
       This method does not seem to be used anymore, I forgot to delete it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10725: [Transaction] Transaction admin api get buffer internal stats

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725#discussion_r640426781



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -789,4 +796,69 @@ protected void validatePersistencePolicies(PersistencePolicies persistence) {
                         persistence.getBookkeeperAckQuorum()));
 
     }
+
+    protected CompletableFuture<ManagedLedgerInternalStats> getManageLedgerInternalStats(ManagedLedger ledger,
+                                                                                       boolean includeLedgerMetadata,
+                                                                                       String topic) {
+        CompletableFuture<ManagedLedgerInternalStats> statFuture = new CompletableFuture<>();
+        ManagedLedgerInternalStats stats = new ManagedLedgerInternalStats();
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) ledger;
+        stats.entriesAddedCounter = ml.getEntriesAddedCounter();
+        stats.numberOfEntries = ml.getNumberOfEntries();
+        stats.totalSize = ml.getTotalSize();
+        stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
+        stats.currentLedgerSize = ml.getCurrentLedgerSize();
+        stats.lastLedgerCreatedTimestamp = DateFormatter.format(ml.getLastLedgerCreatedTimestamp());
+        if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
+            stats.lastLedgerCreationFailureTimestamp = DateFormatter.format(ml.getLastLedgerCreationFailureTimestamp());
+        }
+
+        stats.waitingCursorsCount = ml.getWaitingCursorsCount();
+        stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();
+
+        stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
+        stats.state = ml.getState();
+
+        stats.ledgers = Lists.newArrayList();
+        ml.getLedgersInfo().forEach((id, li) -> {
+            ManagedLedgerInternalStats.LedgerInfo info = new PersistentTopicInternalStats.LedgerInfo();
+            info.ledgerId = li.getLedgerId();
+            info.entries = li.getEntries();
+            info.size = li.getSize();
+            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+            stats.ledgers.add(info);
+            if (includeLedgerMetadata) {
+                ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
+                    if (ex == null) {
+                        info.metadata = lMetadata;
+                    }
+                    return null;
+                });
+            }
+
+            stats.cursors = Maps.newTreeMap();
+            ml.getCursors().forEach(c -> {
+                ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+                PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
+                cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+                cs.readPosition = cursor.getReadPosition().toString();
+                cs.waitingReadOp = cursor.hasPendingReadRequest();
+                cs.pendingReadOps = cursor.getPendingReadOpsCount();
+                cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+                cs.cursorLedger = cursor.getCursorLedger();
+                cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+                cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
+                cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());

Review comment:
       can this getLastLedgerSwitchTimestamp  return null ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10725: [Transaction] Transaction admin api get buffer internal stats

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725#discussion_r640463104



##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -135,9 +137,51 @@
      *
      * @return the metadata of slow transactions.
      */
-    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
-                                                                            TimeUnit timeUnit) throws Exception;
+    Map<String, TransactionMetadata> getSlowTransactions(long timeout, TimeUnit timeUnit) throws PulsarAdminException;
 
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator ID
+     * @param metadata is get ledger metadata
+     *
+     * @return the future internal stats of this coordinator
+     */
+    CompletableFuture<CoordinatorInternalStats> getCoordinatorInternalStatsAsync(int coordinatorId, boolean metadata);
+
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator id

Review comment:
       ```suggestion
        * @param coordinatorId the coordinator ID
   ```

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -135,9 +137,51 @@
      *
      * @return the metadata of slow transactions.
      */
-    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
-                                                                            TimeUnit timeUnit) throws Exception;
+    Map<String, TransactionMetadata> getSlowTransactions(long timeout, TimeUnit timeUnit) throws PulsarAdminException;
 
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator ID
+     * @param metadata is get ledger metadata

Review comment:
       ```suggestion
        * @param metadata the ledger metadata
   ```
   
   do you mean this?

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -606,4 +607,12 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
      * The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.
      */
     CompletableFuture<Void> asyncTruncate();
+
+    /**
+     * Get managed ledger internal stats
+     *
+     * @param includeLedgerMetadata the flag to control it include ledger metadata

Review comment:
       control it
   What does "it" refer to?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -135,9 +137,51 @@
      *
      * @return the metadata of slow transactions.
      */
-    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
-                                                                            TimeUnit timeUnit) throws Exception;
+    Map<String, TransactionMetadata> getSlowTransactions(long timeout, TimeUnit timeUnit) throws PulsarAdminException;
 
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator ID
+     * @param metadata is get ledger metadata
+     *
+     * @return the future internal stats of this coordinator
+     */
+    CompletableFuture<CoordinatorInternalStats> getCoordinatorInternalStatsAsync(int coordinatorId, boolean metadata);
+
+    /**
+     * Get transaction coordinator internal stats.
+     *
+     * @param coordinatorId the coordinator id
+     * @param metadata whether to obtain ledger metadata
+     *
+     * @return the internal stats of this coordinator
+     */
+    CoordinatorInternalStats getCoordinatorInternalStats(int coordinatorId,
+                                                         boolean metadata) throws PulsarAdminException;
 
+    /**
+     * Get pending ack internal stats.
+     *
+     * @param topic the topic of get pending ack internal stats
+     * @param subName the subscription name of this pending ack
+     * @param metadata whether to obtain ledger metadata
+     *
+     * @return the future internal stats of pending ack
+     */
+    CompletableFuture<TransactionPendingAckInternalStats> getPendingAckInternalStatsAsync(String topic, String subName,
+                                                                                          boolean metadata);
+
+    /**
+     * Get pending ack internal stats.
+     *
+     * @param topic the topic of get pending ack internal stats
+     * @param subName the subscription name of this pending ack
+     * @param metadata whether to obtain ledger metadata

Review comment:
       ```suggestion
        * @param metadata whether to obtain ledger metadata
   ```
   
   double check: whether to obtain ledger metadata? or `* @param metadata the ledger metadata`

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ManagedLedgerInternalStats.java
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ManagedLedger internal statistics.
+ */
+public class ManagedLedgerInternalStats {
+
+    /** Messages published since this broker loaded this managedLedger. */
+    public long entriesAddedCounter;
+
+    /** The total number of entries being tracked. */
+    public long numberOfEntries;
+
+    /**The total storage size of all messages (in bytes). */

Review comment:
       from this line, there is no space after `/**`, is this an intended behavior?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10725: [Transaction] Transaction admin api get pending ack internal stats

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10725:
URL: https://github.com/apache/pulsar/pull/10725


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org