You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/12 11:07:48 UTC

[GitHub] [pulsar] eolivelli opened a new pull request, #16545: Issue 7623: Add API to analise a subscription backlog and provide a accurate value

eolivelli opened a new pull request, #16545:
URL: https://github.com/apache/pulsar/pull/16545

   Fixes #7623
   
   ### Motivation
   
   Currently there is no way to have a accurate backlog for a subscription, the main problem reported in #7623 but PIP-105 Server side filters adds more weight to that request.
   It is nearly impossible to predict the number of messages that will be dispatched by a Subscription with server side filtering.
   
   The idea of this patch is to provide a dedicate API (REST, pulsar-admin, and Java PulsarAdmin) to "analise" a subscription and provide detailed information about that is expected to be delivered to Consumers.
   
   The operation is expensive because we have to load the messages from storage and pass them to the filters, but due to the dynamic nature of Pulsar subscriptions there is no other way to have this value.
   
   One good strategy to do monitoring/alerting is to setup alerts on the usual "stats" and use this new API to inspect the subscription deeper, typically my issuing a manual command.
   
   ### Modifications
   
   - Add a ManagedCursor.scan() API to allow scanning a Cursor from the lastMark position up the end of the Cursor
   - Add a REST API to perform the scan
   - the Scan considers Server Side filtering, and it is able to report very accurate numbers
   
   Follow up works:
   - add a limit on the number of entries processed or a time bound
   
   ### Verifying this change
   
   This change added tests
   
   ### Does this pull request potentially affect one of the following parts:
   It is only a addition of new feature, existing features are not touched.
   
   ### Documentation
   The new command is automatically documented for the REST API and for pulsar-admin


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923063905


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {

Review Comment:
   +1, maybe we can use batch to improve it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r924095830


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000

Review Comment:
   this is only a safe guard mechanism, there is no need to make it configurable at underlying layers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923485855


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1683,6 +1683,42 @@ public void getSubscriptionProperties(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog")
+    @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void analyzeSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalAnalyzeSubscriptionBacklog(asyncResponse, decode(encodedSubName),

Review Comment:
   It's better to move `asyncResponse` out of the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1199109184

   @Technoboy- I have updated ManagedCursorImpl.
   I won't change the API implementation at the moment.
   We can follow up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: Issue 7623: Add API to analyse a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r920825043


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);

Review Comment:
   I am not calling it because we know nothing about the status of the entry at this point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1187601011

   Hi, I have a concern about it.
   
   If the topic has some producers continue producing messages. Is it better to use the `timestamp` to get the backlog?
   The user doesn't need to set the `maxEntries`. If the user does not pass the timestamp, the broker will use the current time to search. If the user provides it, we will continue searching until the satisfying `timestamp` or `timeout`
   Because in the current implementation, if the admin wants to get the precious backlog, they will first search the topic, have how many un-acked entries and then get backlog. They have to use to step to get it.
   But if they use a timestamp, they no longer need to care about `maxEntries`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1188639978

   > Hi, I have a concern about it.
   > 
   > If the topic has some producers continue producing messages(not only this case). Is it better to use the `timestamp` to get the backlog? The user doesn't need to care about the `maxEntries`. If the user does not pass the timestamp, the broker will use the current time to search. If the user provides it, we will continue searching until the satisfying `timestamp` or `timeout` Because in the current implementation, if the admin wants to get the precious backlog, they will first search the topic, have how many un-acked entries and then get backlog. They have to use 2 steps to get it. But if they use a timestamp, they no longer need to care about `maxEntries`.
   
   @mattisonchao this API is to ave a more accurate value for the backlog, that means  the number of messages that are still to be consumed by a subscription.
   So we always have to start from "lastMarkDelete"
   maxEntries and the timeout are there only to prevent scanning a huge number of data.
   when you have so many entries to process it is pointless to perform a accurate scan and we could overwhelm the broker.
   
   This is not a general purpose method to inspect a portion of a topic.
   If you need it, we can it in the future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923062944


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1589,48 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                            Subscription sub = topic.getSubscription(subName);
+                            if (sub == null) {
+                                throw new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                            }
+                            return sub.analyzeBacklog();
+                        })
+                .thenAccept((AnalyzeBacklogResult rawResult) -> {
+
+                        AnalyzeSubscriptionBacklogResult result = new AnalyzeSubscriptionBacklogResult();
+                        result.setEntries(rawResult.getEntries());
+                        result.setMessages(rawResult.getMessages());
+
+                        result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+                        result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+                        result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+                        result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+                        result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+                        result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+                        result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+                        log.info("[{}] analyzeBacklog topic {} subscription {} result {}", clientAppId(), subName,
+                            topicName, result);
+                        asyncResponse.resume(result);
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to analyze subscription backlog {} {}",
+                                clientAppId(), topicName, subName, cause);
+                    }
+                    asyncResponse.resume(new RestException(cause));

Review Comment:
   resumeAsyncResponseExceptionally(asyncResponse, ex);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16545:
URL: https://github.com/apache/pulsar/pull/16545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r933082715


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -1037,6 +1039,28 @@ public Position findNewestMatching(Predicate<Entry> condition) throws Interrupte
         return findNewestMatching(FindPositionConstraint.SearchActiveEntries, condition);
     }
 
+    @Override
+    public CompletableFuture<ScanOutcome> scan(Optional<Position> position,
+                                               Predicate<Entry> condition,
+                                               int batchSize, long maxEntries, long timeOutMs) {
+        PositionImpl startPosition = (PositionImpl) position.orElse(ledger.getNextValidPosition(markDeletePosition));
+        CompletableFuture<ScanOutcome> future = new CompletableFuture<>();

Review Comment:
   orElse -> orElseGet is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r933086166


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -1037,6 +1039,28 @@ public Position findNewestMatching(Predicate<Entry> condition) throws Interrupte
         return findNewestMatching(FindPositionConstraint.SearchActiveEntries, condition);
     }
 
+    @Override
+    public CompletableFuture<ScanOutcome> scan(Optional<Position> position,
+                                               Predicate<Entry> condition,
+                                               int batchSize, long maxEntries, long timeOutMs) {
+        PositionImpl startPosition = (PositionImpl) position.orElse(ledger.getNextValidPosition(markDeletePosition));
+        CompletableFuture<ScanOutcome> future = new CompletableFuture<>();

Review Comment:
   good catch, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r930102273


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          Optional<Position> position,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))

Review Comment:
   @mattisonchao done.
   thanks for the pointer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16545: PIP-187: Add API to analyse a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r921323986


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = Objects.requireNonNull(cursor);
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                try {
+                    if (!condition.apply(entry)) {
+                        exit = true;
+                    }
+                } finally {
+                    entry.release();

Review Comment:
   nit: this could be finally block of outer try (on line 74) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1188641283

   > Another question: do we want to limit the number of concurrencies or add cache for it. (In some cases the user misuse or malicious use)
   
   we could only cache it by using as cache key the lastMarkDeletePosition + individuallyDeletedMessages so it won't be effective.
   Also, Filters are dynamic in nature, for instance you can think about a filter that implements some kind of custom "Time to Live" on the messages, the next time you run it the result is different.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16545: Issue 7623: Add API to analyse a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r920292862


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxTimeMs=10000

Review Comment:
   ```suggestion
   subscriptionBacklogScanMaxEntries =10000
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();

Review Comment:
   move to finally block (in case condition.apply throws)



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),
+                        Optional.ofNullable(searchPosition), OpScan.this.ctx);
+                return;
+            }
+            if (exit) {
+                // user code requested to stop our scan
+                callback.scanComplete(lastMatchedPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                return;
+            }
+        }
+        searchPosition = ledger.getPositionAfterN((PositionImpl) position, 1, PositionBound.startExcluded);
+        if (log.isDebugEnabled()) {
+            log.debug("readEntryComplete {} at {} next is {}", entry, position, searchPosition);
+        }
+
+        if (searchPosition.compareTo((PositionImpl) position) == 0) {
+            // we have reached the end of the ledger, as we are not doing progress
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+            return;
+        }
+        find();
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (System.currentTimeMillis() - startTime > timeOutMs) {
+            log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) {

Review Comment:
   if cursor is null the constructor will throw an NPE at `this.ledger = cursor.ledger` line; the cursor is final.
   Please rethink this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyse a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r920847730


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxTimeMs=10000

Review Comment:
   thanks



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);

Review Comment:
   I moved it to a finally block but not here



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),
+                        Optional.ofNullable(searchPosition), OpScan.this.ctx);
+                return;
+            }
+            if (exit) {
+                // user code requested to stop our scan
+                callback.scanComplete(lastMatchedPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                return;
+            }
+        }
+        searchPosition = ledger.getPositionAfterN((PositionImpl) position, 1, PositionBound.startExcluded);
+        if (log.isDebugEnabled()) {
+            log.debug("readEntryComplete {} at {} next is {}", entry, position, searchPosition);
+        }
+
+        if (searchPosition.compareTo((PositionImpl) position) == 0) {
+            // we have reached the end of the ledger, as we are not doing progress
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+            return;
+        }
+        find();
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (System.currentTimeMillis() - startTime > timeOutMs) {
+            log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+            return;
+        }
+        if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) {

Review Comment:
   good point.
   I started from another Operation that worked also without the cursor.
   In this case there is no need for this null check.
   I am adding the null check in the constructor



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();

Review Comment:
   good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923069240


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1589,48 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                            Subscription sub = topic.getSubscription(subName);
+                            if (sub == null) {
+                                throw new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                            }
+                            return sub.analyzeBacklog();
+                        })
+                .thenAccept((AnalyzeBacklogResult rawResult) -> {
+
+                        AnalyzeSubscriptionBacklogResult result = new AnalyzeSubscriptionBacklogResult();
+                        result.setEntries(rawResult.getEntries());
+                        result.setMessages(rawResult.getMessages());
+
+                        result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+                        result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+                        result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+                        result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+                        result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+                        result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+                        result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+                        log.info("[{}] analyzeBacklog topic {} subscription {} result {}", clientAppId(), subName,
+                            topicName, result);
+                        asyncResponse.resume(result);
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to analyze subscription backlog {} {}",
+                                clientAppId(), topicName, subName, cause);
+                    }
+                    asyncResponse.resume(new RestException(cause));

Review Comment:
   I forgot about that. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r932962036


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000

Review Comment:
   it is important that these two options are still at broker level, because they are there in order to prevent malicious users from breaking the system.
   
   I can follow up add a per namespace values, but I would do it only after some time that users try this feature and provide feedback



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #16545: Issue 7623: Add API to analise a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r919965687


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);

Review Comment:
   should we call `entry.release()` as well ? 



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),
+                        Optional.ofNullable(searchPosition), OpScan.this.ctx);
+            }
+            if (exit) {
+                // user code requested to stop our scan
+                callback.scanComplete(lastMatchedPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                return;
+            }
+        }
+        searchPosition = ledger.getPositionAfterN((PositionImpl) position, 1, PositionBound.startExcluded);
+        if (log.isDebugEnabled()) {
+            log.debug("readEntryComplete {} at {} next is {}", entry, position, searchPosition);
+        }
+
+        if (searchPosition.compareTo((PositionImpl) position) == 0) {
+            // we have reached the end of the ledger, as we are not doing progress
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+            return;
+        }
+        find();
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);

Review Comment:
   from a user perspective receiving ABORTED both for time out and too many entries is not useful. For instance, it could be useful for the user to know that there're too many entries and so there are more entries than the value passed to the scan.. so they can say "backlog is more than 100k entries"



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -512,6 +522,95 @@ public String getTypeString() {
         return "Null";
     }
 
+    @Override
+    public CompletableFuture<AnaliseBacklogResult> analiseBacklog() {
+
+        long start = System.currentTimeMillis();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Starting to analise backlog", topicName, subName);
+        }
+
+        AtomicLong entries = new AtomicLong();
+        AtomicLong accepted = new AtomicLong();
+        AtomicLong rejected = new AtomicLong();
+        AtomicLong rescheduled = new AtomicLong();
+        AtomicLong messages = new AtomicLong();
+        AtomicLong acceptedMessages = new AtomicLong();
+        AtomicLong rejectedMessages = new AtomicLong();
+        AtomicLong rescheduledMessages = new AtomicLong();
+
+        Position currentPosition = cursor.getMarkDeletedPosition();
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] currentPosition {}",
+                    topicName, subName, currentPosition);
+        }
+        final AbstractBaseDispatcher abstractBaseDispatcher = dispatcher != null
+                ? (AbstractBaseDispatcher) dispatcher : new DummyDispatcherForFilters();
+        // we put some hard limits on the scan, in order to prevent denial of services
+        ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
+        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
+        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
+        return cursor.scan(new Predicate<Entry>() {
+            @Override
+            public boolean apply(Entry entry) {
+                if (log.isDebugEnabled()) {
+                    log.debug("found {}", entry);
+                }
+
+                ByteBuf metadataAndPayload = entry.getDataBuffer();
+                MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+                int numMessages = 1;
+                if (messageMetadata.hasNumMessagesInBatch()) {
+                    numMessages = messageMetadata.getNumMessagesInBatch();
+                }
+                EntryFilter.FilterResult filterResult = abstractBaseDispatcher
+                        .runFiltersForEntry(entry, messageMetadata, null);
+
+                if (filterResult == null) {
+                    filterResult = EntryFilter.FilterResult.ACCEPT;
+                }
+                switch (filterResult) {
+                    case REJECT:
+                        rejected.incrementAndGet();
+                        rejectedMessages.addAndGet(numMessages);
+                        break;
+                    case RESCHEDULE:
+                        rescheduled.incrementAndGet();
+                        rescheduledMessages.addAndGet(numMessages);
+                        break;
+                    default:
+                        accepted.incrementAndGet();
+                        acceptedMessages.addAndGet(numMessages);
+                        break;
+                }
+                entries.incrementAndGet();
+                messages.addAndGet(numMessages);
+
+                return true;
+            }
+        }, maxEntries, timeOutMs).thenApply((ScanOutcome outcome) -> {
+            long end = System.currentTimeMillis();
+            AnaliseBacklogResult result = new AnaliseBacklogResult();
+            result.setEntries(entries.get());
+            result.setMessages(messages.get());
+            result.setFilterAcceptedEntries(accepted.get());
+            result.setFilterAcceptedMessages(acceptedMessages.get());
+            result.setFilterRejectedEntries(rejected.get());
+            result.setFilterRejectedMessages(rejectedMessages.get());
+            result.setFilterRescheduledEntries(rescheduled.get());
+            result.setFilterRescheduledMessages(rescheduledMessages.get());
+            // sometimes we abort the execution due to a timeout or
+            // when we reach a maximum number of entries
+            result.setScanOutcome(outcome);
+            log.info(
+                    "[{}][{}] scan took {} ms - {}",
+                    topicName, subName, end, result);

Review Comment:
   end - start ? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2434,6 +2496,34 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnaliseSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Get message ID by timestamp on a partitioned topic is not allowed, "

Review Comment:
   this error doesn't make sense 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1570,6 +1590,48 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnaliseSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                            Subscription sub = topic.getSubscription(subName);
+                            if (sub == null) {
+                                throw new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                            }
+                            return sub.analiseBacklog();
+                        })
+                .thenAccept((AnaliseBacklogResult rawResult) -> {
+
+                        AnaliseSubscriptionBacklogResult result = new AnaliseSubscriptionBacklogResult();
+                        result.setEntries(rawResult.getEntries());
+                        result.setMessages(rawResult.getMessages());
+
+                        result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+                        result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+                        result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+                        result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+                        result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+                        result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+                        result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+                        log.info("[{}] analiseBacklog topic {} subscription {} result {}", clientAppId(), subName,

Review Comment:
   could we add the time spent doing the analysis ? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2434,6 +2496,34 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnaliseSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Get message ID by timestamp on a partitioned topic is not allowed, "
+                                + "please try do it on specific topic partition");
+            }
+            internalAnaliseSubscriptionBacklogForNonPartitionedTopic(asyncResponse, subName, authoritative);
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isRedirectException(ex)) {
+                log.error("[{}] Failed to update subscription {} from topic {}",

Review Comment:
   Failed to analise ? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1671,6 +1671,42 @@ public void getSubscriptionProperties(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analiseBacklog")
+    @ApiOperation(value = "Analyses a subscription, by scanning all the unprocessed messages")

Review Comment:
   ```suggestion
       @ApiOperation(value = "Analyze a subscription by scanning all the unprocessed messages")
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),

Review Comment:
   if an exception is raised we're going to continue the scan, is it intended? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r933082333


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,

Review Comment:
   We have changed most of the admin implements with future returned types.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r924101476


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1683,6 +1683,42 @@ public void getSubscriptionProperties(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog")
+    @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void analyzeSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalAnalyzeSubscriptionBacklog(asyncResponse, decode(encodedSubName),

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r924097612


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1683,6 +1683,42 @@ public void getSubscriptionProperties(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog")
+    @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Method Not Allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void analyzeSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription", required = true)
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalAnalyzeSubscriptionBacklog(asyncResponse, decode(encodedSubName),

Review Comment:
   I am following the code style of the other methods, like `resetCursorOnPosition` below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923062826


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {

Review Comment:
   Let me take a look about how to do it.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -489,6 +489,7 @@ public SubType getType() {
 
     @Override
     public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+        log.info("read {}", entries.size(), new Exception("xxx").fillInStackTrace());

Review Comment:
   it is a left over of some debug session, committed by mistake.
   removed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923091837


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),
+                        Optional.ofNullable(searchPosition), OpScan.this.ctx);
+            }
+            if (exit) {
+                // user code requested to stop our scan
+                callback.scanComplete(lastMatchedPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                return;
+            }
+        }
+        searchPosition = ledger.getPositionAfterN((PositionImpl) position, 1, PositionBound.startExcluded);
+        if (log.isDebugEnabled()) {
+            log.debug("readEntryComplete {} at {} next is {}", entry, position, searchPosition);
+        }
+
+        if (searchPosition.compareTo((PositionImpl) position) == 0) {
+            // we have reached the end of the ledger, as we are not doing progress
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+            return;
+        }
+        find();
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);

Review Comment:
   >This operation is intended to be run by administrators to understand the amount of messages that are still to be processed
   
   Maybe `timestamp` is better? the broker searching entries until some specific time or timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1188649637

   > @mattisonchao I have answered to your comments, PTAL
   
   Very much appreciate your explanation. thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923127631


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {

Review Comment:
   @Technoboy- @mattisonchao I have updated the patch to use readEntries and process entries in batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923059262


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {

Review Comment:
   Could we implements ReadEntriesCallback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r932966700


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -17,7 +17,24 @@
  * under the License.
  */
 package org.apache.pulsar.broker.admin.impl;
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

Review Comment:
   oh, I don't know how it happened. removed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,

Review Comment:
   I agree with you, but this whole class is coded this way.
   I am not sure it is worth to change the code style only for one method.
   
   



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1832,6 +1835,138 @@ void testFindNewestMatching() throws Exception {
                 c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
     }
 
+    @Test(timeOut = 20000)
+    void testScanSingleEntry() throws Exception {
+        testScan(10,1);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatchesWithSomeRemainder() throws Exception {
+        testScan(10,3);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatches() throws Exception {
+        testScan(10,5);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatchWholeLedger() throws Exception {
+        testScan(10,1000);
+    }
+
+    @Test(timeOut = 1000)
+    void testScanBatchEmptyLedger() throws Exception {
+        testScan(0,10);
+    }

Review Comment:
   sure.
   I usually prefer this form while developing tests, because it is easier to run single combination of parameters and the method name explains the meaning of the test.
   
   I have updated the patch



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntriesCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;

Review Comment:
   good catch, removed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2382,6 +2460,35 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      Optional<Position> position,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Analyze backlog on a partitioned topic is not allowed, "
+                                + "please try do it on specific topic partition");
+            }

Review Comment:
   other methods check if the topic is partitioned in this same place.
   if the broker is not the owner we will be redirected to another broker and we won't reach this point.
   
   I notice that getPartitionedTopicMetadata is sync, I have reworked this part to make it fully async
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16545: Issue 7623: Add API to analise a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1181736141

   The related PR support the precise backlog #14958


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: Issue 7623: Add API to analise a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r919993652


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),

Review Comment:
   good catch!



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2434,6 +2496,34 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnaliseSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Get message ID by timestamp on a partitioned topic is not allowed, "

Review Comment:
   updated, thanks



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntryCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;
+    private final ScanCallback callback;
+    private final Predicate<Entry> condition;
+    private final Object ctx;
+    private final AtomicLong remainingEntries = new AtomicLong();
+    private final long timeOutMs;
+    private final long startTime = System.currentTimeMillis();
+
+    PositionImpl searchPosition;
+    Position lastMatchedPosition = null;
+
+    public OpScan(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate<Entry> condition,
+                  ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+        this.cursor = cursor;
+        this.ledger = cursor.ledger;
+        this.startPosition = startPosition;
+        this.callback = callback;
+        this.condition = condition;
+        this.ctx = ctx;
+        this.searchPosition = startPosition;
+        this.remainingEntries.set(maxEntries);
+        this.timeOutMs = timeOutMs;
+    }
+
+
+    @Override
+    public void readEntryComplete(Entry entry, Object ctx) {
+        remainingEntries.decrementAndGet();
+        final Position position = entry.getPosition();
+        lastMatchedPosition = position;
+        // filter out the entry if it has been already deleted
+        // filterReadEntries will call entry.release if the entry is filtered out
+        List<Entry> entries = this.cursor.filterReadEntries(List.of(entry));
+        if (!entries.isEmpty()) {
+            boolean exit = false;
+            try {
+                if (!condition.apply(entry)) {
+                  exit = true;
+                }
+                entry.release();
+            } catch (Throwable err) {
+                log.error("[{}] user exception", cursor, err);
+                callback.scanFailed(ManagedLedgerException.getManagedLedgerException(err),
+                        Optional.ofNullable(searchPosition), OpScan.this.ctx);
+            }
+            if (exit) {
+                // user code requested to stop our scan
+                callback.scanComplete(lastMatchedPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+                return;
+            }
+        }
+        searchPosition = ledger.getPositionAfterN((PositionImpl) position, 1, PositionBound.startExcluded);
+        if (log.isDebugEnabled()) {
+            log.debug("readEntryComplete {} at {} next is {}", entry, position, searchPosition);
+        }
+
+        if (searchPosition.compareTo((PositionImpl) position) == 0) {
+            // we have reached the end of the ledger, as we are not doing progress
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+            return;
+        }
+        find();
+    }
+
+    @Override
+    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+        callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+    }
+
+    public void find() {
+        if (remainingEntries.get() <= 0) {
+            log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+            callback.scanComplete(lastMatchedPosition, ScanOutcome.ABORTED, OpScan.this.ctx);

Review Comment:
   I am not sure it adds so much value.
   If we add two distinct cases, then we have to add new enum values for new cases in the future.
   The alternative would be to add a descriptive string, but usually it is not a good idea to provide human readable descriptions in APIs (no localization....)
   
   This check is done only to prevent huge scans, the user will see "aborted: true", and that is enough to understand that the operation was interrupted abruptly. The logs will tell more.
   
   This operation is intended to be run by administrators to understand the amount of messages that are still to be processed
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1570,6 +1590,48 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnaliseSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                            Subscription sub = topic.getSubscription(subName);
+                            if (sub == null) {
+                                throw new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName));
+                            }
+                            return sub.analiseBacklog();
+                        })
+                .thenAccept((AnaliseBacklogResult rawResult) -> {
+
+                        AnaliseSubscriptionBacklogResult result = new AnaliseSubscriptionBacklogResult();
+                        result.setEntries(rawResult.getEntries());
+                        result.setMessages(rawResult.getMessages());
+
+                        result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+                        result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+                        result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+                        result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+                        result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+                        result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+                        result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+                        log.info("[{}] analiseBacklog topic {} subscription {} result {}", clientAppId(), subName,

Review Comment:
   the time is already reported on the HTTP log, and we are reporting it in other loggers as well



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2434,6 +2496,34 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnaliseSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Get message ID by timestamp on a partitioned topic is not allowed, "
+                                + "please try do it on specific topic partition");
+            }
+            internalAnaliseSubscriptionBacklogForNonPartitionedTopic(asyncResponse, subName, authoritative);
+        }).exceptionally(ex -> {
+            // If the exception is not redirect exception we need to log it.
+            if (!isRedirectException(ex)) {
+                log.error("[{}] Failed to update subscription {} from topic {}",

Review Comment:
   updated, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923017051


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -489,6 +489,7 @@ public SubType getType() {
 
     @Override
     public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+        log.info("read {}", entries.size(), new Exception("xxx").fillInStackTrace());

Review Comment:
   What is the purpose of this log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16545: Issue 7623: Add API to analyse a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r920825256


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java:
##########
@@ -512,6 +522,95 @@ public String getTypeString() {
         return "Null";
     }
 
+    @Override
+    public CompletableFuture<AnaliseBacklogResult> analiseBacklog() {
+
+        long start = System.currentTimeMillis();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] Starting to analise backlog", topicName, subName);
+        }
+
+        AtomicLong entries = new AtomicLong();
+        AtomicLong accepted = new AtomicLong();
+        AtomicLong rejected = new AtomicLong();
+        AtomicLong rescheduled = new AtomicLong();
+        AtomicLong messages = new AtomicLong();
+        AtomicLong acceptedMessages = new AtomicLong();
+        AtomicLong rejectedMessages = new AtomicLong();
+        AtomicLong rescheduledMessages = new AtomicLong();
+
+        Position currentPosition = cursor.getMarkDeletedPosition();
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] currentPosition {}",
+                    topicName, subName, currentPosition);
+        }
+        final AbstractBaseDispatcher abstractBaseDispatcher = dispatcher != null
+                ? (AbstractBaseDispatcher) dispatcher : new DummyDispatcherForFilters();
+        // we put some hard limits on the scan, in order to prevent denial of services
+        ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
+        long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
+        long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
+        return cursor.scan(new Predicate<Entry>() {
+            @Override
+            public boolean apply(Entry entry) {
+                if (log.isDebugEnabled()) {
+                    log.debug("found {}", entry);
+                }
+
+                ByteBuf metadataAndPayload = entry.getDataBuffer();
+                MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+                int numMessages = 1;
+                if (messageMetadata.hasNumMessagesInBatch()) {
+                    numMessages = messageMetadata.getNumMessagesInBatch();
+                }
+                EntryFilter.FilterResult filterResult = abstractBaseDispatcher
+                        .runFiltersForEntry(entry, messageMetadata, null);
+
+                if (filterResult == null) {
+                    filterResult = EntryFilter.FilterResult.ACCEPT;
+                }
+                switch (filterResult) {
+                    case REJECT:
+                        rejected.incrementAndGet();
+                        rejectedMessages.addAndGet(numMessages);
+                        break;
+                    case RESCHEDULE:
+                        rescheduled.incrementAndGet();
+                        rescheduledMessages.addAndGet(numMessages);
+                        break;
+                    default:
+                        accepted.incrementAndGet();
+                        acceptedMessages.addAndGet(numMessages);
+                        break;
+                }
+                entries.incrementAndGet();
+                messages.addAndGet(numMessages);
+
+                return true;
+            }
+        }, maxEntries, timeOutMs).thenApply((ScanOutcome outcome) -> {
+            long end = System.currentTimeMillis();
+            AnaliseBacklogResult result = new AnaliseBacklogResult();
+            result.setEntries(entries.get());
+            result.setMessages(messages.get());
+            result.setFilterAcceptedEntries(accepted.get());
+            result.setFilterAcceptedMessages(acceptedMessages.get());
+            result.setFilterRejectedEntries(rejected.get());
+            result.setFilterRejectedMessages(rejectedMessages.get());
+            result.setFilterRescheduledEntries(rescheduled.get());
+            result.setFilterRescheduledMessages(rescheduledMessages.get());
+            // sometimes we abort the execution due to a timeout or
+            // when we reach a maximum number of entries
+            result.setScanOutcome(outcome);
+            log.info(
+                    "[{}][{}] scan took {} ms - {}",
+                    topicName, subName, end, result);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r923489879


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000

Review Comment:
   Should we make it dynamic or put it to ns/topic policy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r924101118


##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#issuecomment-1187601209

   Another question: do we want to limit the number of concurrencies or add cache for it. (In some cases the user misuse or malicious use)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r930073220


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+                                                                          String subName,
+                                                                          Optional<Position> position,
+                                                                          boolean authoritative) {
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenRun(() -> validateTopicOperation(topicName, TopicOperation.CONSUME))

Review Comment:
   We have to use `validateTopicOperationAsync` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16545: PIP-187: Add API to analyze a subscription backlog and provide a accurate value

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16545:
URL: https://github.com/apache/pulsar/pull/16545#discussion_r932154902


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -17,7 +17,24 @@
  * under the License.
  */
 package org.apache.pulsar.broker.admin.impl;
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

Review Comment:
   Duplicated license header.



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -1832,6 +1835,138 @@ void testFindNewestMatching() throws Exception {
                 c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
     }
 
+    @Test(timeOut = 20000)
+    void testScanSingleEntry() throws Exception {
+        testScan(10,1);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatchesWithSomeRemainder() throws Exception {
+        testScan(10,3);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatches() throws Exception {
+        testScan(10,5);
+    }
+
+    @Test(timeOut = 30000)
+    void testScanBatchWholeLedger() throws Exception {
+        testScan(10,1000);
+    }
+
+    @Test(timeOut = 1000)
+    void testScanBatchEmptyLedger() throws Exception {
+        testScan(0,10);
+    }

Review Comment:
   It's better to add a data provider instead of separate methods.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntriesCallback {
+    private final ManagedCursorImpl cursor;
+    private final ManagedLedgerImpl ledger;
+    private final PositionImpl startPosition;

Review Comment:
   looks we can remove this one.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2382,6 +2460,35 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
         });
     }
 
+    protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+                                                      Optional<Position> position,
+                                                      boolean authoritative) {
+        CompletableFuture<Void> future;
+        if (topicName.isGlobal()) {
+            future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            future = CompletableFuture.completedFuture(null);
+        }
+
+        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+            if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
+                    authoritative, false).partitions > 0) {
+                throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        "Analyze backlog on a partitioned topic is not allowed, "
+                                + "please try do it on specific topic partition");
+            }

Review Comment:
   We should check the topic is a partitioned topic or not first? If it's a partitioned topic, the ownership checking is redundant



##########
conf/broker.conf:
##########
@@ -235,6 +235,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000

Review Comment:
   ```suggestion
   subscriptionBacklogScanMaxTimeInMillis=120000
   ```
   Just keep consistent with other configurations in broker.conf.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn
             });
     }
 
+    private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,

Review Comment:
   It's better to return CompletableFuture<AnalyzeSubscriptionBacklogResult> here to avoid passing the AsyncResponse to the internal of implement, it will help us to understand the returned type without looking at the implementation details



##########
conf/broker.conf:
##########
@@ -232,6 +232,12 @@ subscriptionKeySharedUseConsistentHashing=true
 # The higher the number, the more equal the assignment of keys to consumers
 subscriptionKeySharedConsistentHashingReplicaPoints=100
 
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000

Review Comment:
   I think we can support these 2 options in the http endpoint, instead of support ns/topic polices.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org