You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/30 01:41:43 UTC
[pulsar] branch master updated: PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1af9695ddf5 PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)
1af9695ddf5 is described below
commit 1af9695ddf54927fcbc0c3db5ee2647fb2d820e3
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Jul 30 03:41:35 2022 +0200
PIP-187: Add API to analyze a subscription backlog and provide a accurate value (#16545)
---
conf/broker.conf | 6 +
.../apache/bookkeeper/mledger/AsyncCallbacks.java | 6 +
.../apache/bookkeeper/mledger/ManagedCursor.java | 18 ++
.../org/apache/bookkeeper/mledger/ScanOutcome.java | 37 ++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 25 +++
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 1 -
.../org/apache/bookkeeper/mledger/impl/OpScan.java | 137 +++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 122 +++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 10 ++
.../broker/admin/impl/PersistentTopicsBase.java | 101 +++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 51 +++++-
.../broker/service/AbstractBaseDispatcher.java | 73 ++------
.../broker/service/AnalyzeBacklogResult.java | 46 +++++
.../pulsar/broker/service/EntryFilterSupport.java | 85 +++++++++
.../apache/pulsar/broker/service/Subscription.java | 3 +
.../nonpersistent/NonPersistentSubscription.java | 10 ++
.../service/persistent/PersistentSubscription.java | 114 ++++++++++++
.../admin/AnalyzeBacklogSubscriptionTest.java | 194 +++++++++++++++++++++
.../broker/admin/CreateSubscriptionTest.java | 4 +
.../broker/service/plugin/EntryFilterTest.java | 3 +-
.../broker/service/plugin/FilterEntryTest.java | 58 +++++-
.../org/apache/pulsar/client/admin/Topics.java | 43 +++++
.../stats/AnalyzeSubscriptionBacklogResult.java | 42 +++++
.../pulsar/client/admin/internal/BaseResource.java | 9 +
.../pulsar/client/admin/internal/TopicsImpl.java | 42 +++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +
.../org/apache/pulsar/admin/cli/CmdTopics.java | 29 +++
27 files changed, 1205 insertions(+), 68 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 41809f87116..7a0e32c95b3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -235,6 +235,12 @@ subscriptionKeySharedUseConsistentHashing=true
# The higher the number, the more equal the assignment of keys to consumers
subscriptionKeySharedConsistentHashingReplicaPoints=100
+# Maximum time in ms for a Analise backlog operation to complete
+subscriptionBacklogScanMaxTimeMs=120000
+
+# Maximum number of entries to be read within a Analise backlog operation
+subscriptionBacklogScanMaxEntries=10000
+
# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index e5fd0ddd1dd..395da52b2af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -124,6 +124,12 @@ public interface AsyncCallbacks {
void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
}
+ interface ScanCallback {
+ void scanComplete(Position position, ScanOutcome scanOutcome, Object ctx);
+
+ void scanFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
+ }
+
interface ResetCursorCallback {
void resetComplete(Object ctx);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 46ca0f14003..3f6852e4085 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -22,6 +22,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Range;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -502,6 +503,23 @@ public interface ManagedCursor {
*/
Position findNewestMatching(Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException;
+ /**
+ * Scan the cursor from the current position up to the end.
+ * Please note that this is an expensive operation
+ * @param startingPosition the position to start from, if not provided the scan will start from
+ * the lastDeleteMarkPosition
+ * @param condition a condition to continue the scan, the condition can access the entry
+ * @param batchSize number of entries to process at each read
+ * @param maxEntries maximum number of entries to scan
+ * @param timeOutMs maximum time to spend on this operation
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ default CompletableFuture<ScanOutcome> scan(Optional<Position> startingPosition,
+ Predicate<Entry> condition,
+ int batchSize, long maxEntries, long timeOutMs) {
+ return CompletableFuture.failedFuture(new UnsupportedOperationException());
+ }
/**
* Find the newest entry that matches the given predicate.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java
new file mode 100644
index 00000000000..c7a4cff474c
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ScanOutcome.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+/**
+ * Outcome of a Scan operation.
+ */
+public enum ScanOutcome {
+ /**
+ * The scan run fully.
+ */
+ COMPLETED,
+ /**
+ * The scan was aborted by the system (timed out or too many entries...).
+ */
+ ABORTED,
+ /**
+ * The provided user condition aborted the scan.
+ */
+ USER_INTERRUPTED
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 83ee356e5c3..4c688b2d9be 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -83,6 +84,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -1037,6 +1039,29 @@ public class ManagedCursorImpl implements ManagedCursor {
return findNewestMatching(FindPositionConstraint.SearchActiveEntries, condition);
}
+ @Override
+ public CompletableFuture<ScanOutcome> scan(Optional<Position> position,
+ Predicate<Entry> condition,
+ int batchSize, long maxEntries, long timeOutMs) {
+ PositionImpl startPosition = (PositionImpl) position.orElseGet(
+ () -> ledger.getNextValidPosition(markDeletePosition));
+ CompletableFuture<ScanOutcome> future = new CompletableFuture<>();
+ OpScan op = new OpScan(this, batchSize, startPosition, condition, new ScanCallback() {
+ @Override
+ public void scanComplete(Position position, ScanOutcome scanOutcome, Object ctx) {
+ future.complete(scanOutcome);
+ }
+
+ @Override
+ public void scanFailed(ManagedLedgerException exception,
+ Optional<Position> failedReadPosition, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null, maxEntries, timeOutMs);
+ op.find();
+ return future;
+ }
+
@Override
public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition)
throws InterruptedException, ManagedLedgerException {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 3d1e565ac34..2a47cfdc537 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -235,7 +235,6 @@ public class MetaStoreImpl implements MetaStore {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
}
-
store.put(path, content, Optional.of(expectedVersion))
.thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
.chooseThread(ledgerName))
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
new file mode 100644
index 00000000000..e65e418a5ec
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.base.Predicate;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
+
+@Slf4j
+class OpScan implements ReadEntriesCallback {
+ private final ManagedCursorImpl cursor;
+ private final ManagedLedgerImpl ledger;
+ private final ScanCallback callback;
+ private final Predicate<Entry> condition;
+ private final Object ctx;
+ private final AtomicLong remainingEntries = new AtomicLong();
+ private final long timeOutMs;
+ private final long startTime = System.currentTimeMillis();
+ private final int batchSize;
+
+ PositionImpl searchPosition;
+ Position lastSeenPosition = null;
+
+ public OpScan(ManagedCursorImpl cursor, int batchSize,
+ PositionImpl startPosition, Predicate<Entry> condition,
+ ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) {
+ this.batchSize = batchSize;
+ if (batchSize <= 0) {
+ throw new IllegalArgumentException("batchSize " + batchSize);
+ }
+ this.cursor = Objects.requireNonNull(cursor);
+ this.ledger = cursor.ledger;
+ this.callback = callback;
+ this.condition = condition;
+ this.ctx = ctx;
+ this.searchPosition = startPosition;
+ this.remainingEntries.set(maxEntries);
+ this.timeOutMs = timeOutMs;
+ }
+
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ try {
+ Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
+ lastSeenPosition = lastPositionForBatch;
+ // filter out the entry if it has been already deleted
+ // filterReadEntries will call entry.release if the entry is filtered out
+ List<Entry> entriesFiltered = this.cursor.filterReadEntries(entries);
+ int skippedEntries = entries.size() - entriesFiltered.size();
+ remainingEntries.addAndGet(-skippedEntries);
+ if (!entriesFiltered.isEmpty()) {
+ for (Entry entry : entriesFiltered) {
+ if (remainingEntries.decrementAndGet() <= 0) {
+ log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (!condition.apply(entry)) {
+ log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+ return;
+ }
+ }
+ }
+ searchPosition = ledger.getPositionAfterN((PositionImpl) lastPositionForBatch, 1,
+ PositionBound.startExcluded);
+ if (log.isDebugEnabled()) {
+ log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition);
+ }
+
+ if (searchPosition.compareTo((PositionImpl) lastPositionForBatch) == 0) {
+ // we have reached the end of the ledger, as we are not doing progress
+ callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+ return;
+ }
+ } catch (Throwable t) {
+ log.error("Unhandled error", t);
+ callback.scanFailed(ManagedLedgerException.getManagedLedgerException(t),
+ Optional.ofNullable(lastSeenPosition), OpScan.this.ctx);
+ return;
+ } finally {
+ entries.forEach(Entry::release);
+ }
+ find();
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+ callback.scanFailed(exception, Optional.ofNullable(searchPosition), OpScan.this.ctx);
+ }
+
+ public void find() {
+ if (remainingEntries.get() <= 0) {
+ log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (System.currentTimeMillis() - startTime > timeOutMs) {
+ log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (cursor.hasMoreEntries(searchPosition)) {
+ OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
+ this, OpScan.this.ctx, null);
+ ledger.asyncReadEntries(opReadEntry);
+ } else {
+ callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
+ }
+ }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b5d677f8b04..4fb51885b9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,6 +51,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
@@ -84,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -1832,6 +1835,125 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}
+ @DataProvider(name = "testScanValues")
+ public static Object[][] testScanValues() {
+ return new Object[][] {
+ { 10, 1 }, // single entry
+ { 10, 3 }, // batches with remainder
+ { 10, 5 }, // batches, half
+ { 10, 1000 }, // big batch size, scan whole ledger in one round
+ { 0, 10 } // empty ledger
+ };
+ }
+
+ @Test(dataProvider = "testScanValues", timeOut = 30000)
+ void testScan(int numEntries, int batchSize) throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger_scan_" + numEntries
+ + "_" +batchSize);
+
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < numEntries; i++) {
+ positions.add(ledger.addEntry(("a" + i).getBytes(Encoding)));
+ }
+
+ List<String> contents = new CopyOnWriteArrayList<>();
+
+ assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+ contents.add(new String(entry.getData(), StandardCharsets.UTF_8));
+ return true;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+ for (int i = 0; i < numEntries; i++) {
+ assertEquals(contents.get(i), ("a" + i));
+ }
+ assertEquals(contents.size(), numEntries);
+
+ if (numEntries <= 0) {
+ return;
+ }
+
+ List<String> contentsFromHalf = new CopyOnWriteArrayList<>();
+ int half = numEntries / 2;
+ Position halfPosition = positions.get(half);
+ assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.of(halfPosition), (entry -> {
+ contentsFromHalf.add(new String(entry.getData(), StandardCharsets.UTF_8));
+ return true;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+ for (int i = half; i < numEntries; i++) {
+ assertEquals(contentsFromHalf.get(i - half), ("a" + i));
+ }
+ assertEquals(contentsFromHalf.size(), numEntries - half);
+
+ assertEquals(ScanOutcome.USER_INTERRUPTED, c1.scan(Optional.empty(), (entry -> {
+ return false;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+
+ // max entries
+ assertEquals(ScanOutcome.ABORTED, c1.scan(Optional.empty(), (entry -> {
+ return true;
+ }), batchSize, 1, Long.MAX_VALUE).get());
+
+ // timeout
+ // please note that the timeout is verified
+ // between the reads
+ // so with a big batchSize this test would take too much
+ // we are skipping this check if batchSize is too big
+ if (batchSize <= 5) {
+ assertEquals(ScanOutcome.ABORTED, c1.scan(Optional.empty(), (entry -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ }
+ return true;
+ }), batchSize, Long.MAX_VALUE, 1000).get());
+ }
+ // user code exception
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ c1.scan(Optional.empty(), (entry -> {
+ throw new RuntimeException("dummy!");
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).handle((___, err) -> {
+ error.set(err);
+ return null;
+ }).get();
+ assertTrue(error.get() instanceof ManagedLedgerException);
+ assertTrue(error.get().getCause() instanceof RuntimeException);
+ assertEquals(error.get().getCause().getMessage(), "dummy!");
+
+
+ // test deleted entries
+ positions.clear();
+ assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+ positions.add(entry.getPosition());
+ return true;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+ assertEquals(numEntries, positions.size());
+
+ // delete one entry in the middle
+ c1.delete(positions.get(2));
+
+ List<Position> positionsAfterDelete = new ArrayList<>();
+ assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+ positionsAfterDelete.add(entry.getPosition());
+ return true;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+ assertEquals(numEntries - 1, positionsAfterDelete.size());
+
+ // delete all the entries
+ for (Position p : positionsAfterDelete) {
+ c1.delete(p);
+ }
+
+ List<Position> positionsFinal = new ArrayList<>();
+ assertEquals(ScanOutcome.COMPLETED, c1.scan(Optional.empty(), (entry -> {
+ positionsFinal.add(entry.getPosition());
+ return true;
+ }), batchSize, Long.MAX_VALUE, Long.MAX_VALUE).get());
+ assertEquals(0,positionsFinal.size());
+
+ }
+
@Test(timeOut = 20000)
void testFindNewestMatchingOdd1() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6bd9dde24f..4fb680cf8b4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -625,6 +625,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed"
)
private int activeConsumerFailoverDelayTimeMillis = 1000;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog"
+ )
+ private long subscriptionBacklogScanMaxTimeMs = 1000 * 60 * 2L;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Maximum number of entries to process while scanning a subscription to calculate the accurate backlog"
+ )
+ private long subscriptionBacklogScanMaxEntries = 10_000;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "How long to delete inactive subscriptions from last consuming."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 84356d8ee22..6f48145b0ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -58,6 +58,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
@@ -68,6 +70,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
@@ -131,6 +134,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.policies.data.stats.PartitionedTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
@@ -1569,6 +1573,63 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
+ String subName,
+ Optional<Position> position,
+ boolean authoritative) {
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.CONSUME))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
+ }
+ return sub.analyzeBacklog(position);
+ })
+ .thenAccept((AnalyzeBacklogResult rawResult) -> {
+
+ AnalyzeSubscriptionBacklogResult result = new AnalyzeSubscriptionBacklogResult();
+
+ if (rawResult.getFirstPosition() != null) {
+ result.setFirstMessageId(
+ rawResult.getFirstPosition().getLedgerId()
+ + ":"
+ + rawResult.getFirstPosition().getEntryId());
+ }
+
+ if (rawResult.getLastPosition() != null) {
+ result.setLastMessageId(rawResult.getLastPosition().getLedgerId()
+ + ":"
+ + rawResult.getLastPosition().getEntryId());
+ }
+
+ result.setEntries(rawResult.getEntries());
+ result.setMessages(rawResult.getMessages());
+
+ result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
+ result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
+ result.setFilterRescheduledEntries(rawResult.getFilterRescheduledEntries());
+
+ result.setFilterAcceptedMessages(rawResult.getFilterAcceptedMessages());
+ result.setFilterRejectedMessages(rawResult.getFilterRejectedMessages());
+ result.setFilterRescheduledMessages(rawResult.getFilterRescheduledMessages());
+ result.setAborted(rawResult.getScanOutcome() != ScanOutcome.COMPLETED);
+ log.info("[{}] analyzeBacklog topic {} subscription {} result {}", clientAppId(), subName,
+ topicName, result);
+ asyncResponse.resume(result);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to analyze subscription backlog {} {}",
+ clientAppId(), topicName, subName, cause);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ return null;
+ });
+ }
private void internalUpdateSubscriptionPropertiesForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, Map<String, String> subscriptionProperties,
boolean authoritative) {
@@ -2382,6 +2443,46 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
+ Optional<Position> position,
+ boolean authoritative) {
+ CompletableFuture<Void> future;
+ if (topicName.isGlobal()) {
+ future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> {
+ if (topicName.isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenAccept(metadata -> {
+ if (metadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Analyze backlog on a partitioned topic is not allowed, "
+ + "please try do it on specific topic partition");
+ }
+ });
+ }
+ })
+ .thenAccept(__ -> {
+ internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(asyncResponse, subName,
+ position, authoritative);
+ })
+ .exceptionally(ex -> {
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to analyze back log of subscription {} from topic {}",
+ clientAppId(), subName, topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
boolean authoritative) {
CompletableFuture<Void> future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 28dc8015ff8..854adf98ac3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -45,6 +45,8 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
@@ -1612,7 +1614,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PUT
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
- @ApiOperation(value = "Replaces all the properties on the given subscription")
+ @ApiOperation(value = "Replace all the properties on the given subscription")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
@@ -1649,7 +1651,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/properties")
- @ApiOperation(value = "Replaces all the properties on the given subscription")
+ @ApiOperation(value = "Return all the properties on the given subscription")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
@@ -1683,6 +1685,51 @@ public class PersistentTopics extends PersistentTopicsBase {
}
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog")
+ @ApiOperation(value = "Analyse a subscription, by scanning all the unprocessed messages")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+ @ApiResponse(code = 405, message = "Method Not Allowed"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+ })
+ public void analyzeSubscriptionBacklog(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription", required = true)
+ @PathParam("subName") String encodedSubName,
+ @ApiParam(name = "position", value = "messageId to start the analysis")
+ ResetCursorData position,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ try {
+ Optional<Position> positionImpl;
+ if (position != null) {
+ positionImpl = Optional.of(new PositionImpl(position.getLedgerId(),
+ position.getEntryId()));
+ } else {
+ positionImpl = Optional.empty();
+ }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalAnalyzeSubscriptionBacklog(asyncResponse, decode(encodedSubName),
+ positionImpl, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d9f36bf0643..de62df77e98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
-import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,7 +32,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -41,8 +39,6 @@ import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
-import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
-import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -51,31 +47,15 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
@Slf4j
-public abstract class AbstractBaseDispatcher implements Dispatcher {
-
- protected final Subscription subscription;
+public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher {
protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;
- /**
- * Entry filters in Broker.
- * Not set to final, for the convenience of testing mock.
- */
- protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
- protected final FilterContext filterContext;
protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
- this.subscription = subscription;
+ super(subscription);
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
- if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
- .getBrokerService().getEntryFilters())) {
- this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
- this.filterContext = new FilterContext();
- } else {
- this.entryFilters = ImmutableList.of();
- this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
- }
}
@@ -106,7 +86,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
isReplayRead, consumer);
}
-
/**
* Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
*
@@ -134,20 +113,17 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
final int metadataIndex = i + startOffset;
final MessageMetadata msgMetadata = optMetadataArray.map(metadataArray -> metadataArray[metadataIndex])
.orElseGet(() -> Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1));
- if (CollectionUtils.isNotEmpty(entryFilters)) {
- fillContext(filterContext, msgMetadata, subscription, consumer);
- EntryFilter.FilterResult filterResult = getFilterResult(filterContext, entry, entryFilters);
- if (filterResult == EntryFilter.FilterResult.REJECT) {
- entriesToFiltered.add(entry.getPosition());
- entries.set(i, null);
- entry.release();
- continue;
- } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
- entriesToRedeliver.add((PositionImpl) entry.getPosition());
- entries.set(i, null);
- entry.release();
- continue;
- }
+ EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
+ if (filterResult == EntryFilter.FilterResult.REJECT) {
+ entriesToFiltered.add(entry.getPosition());
+ entries.set(i, null);
+ entry.release();
+ continue;
+ } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
+ entriesToRedeliver.add((PositionImpl) entry.getPosition());
+ entries.set(i, null);
+ entry.release();
+ continue;
}
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
@@ -238,29 +214,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
}
}
- private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
- ImmutableList<EntryFilterWithClassLoader> entryFilters) {
- for (EntryFilter entryFilter : entryFilters) {
- EntryFilter.FilterResult filterResult =
- entryFilter.filterEntry(entry, filterContext);
- if (filterResult == null) {
- filterResult = EntryFilter.FilterResult.ACCEPT;
- }
- if (filterResult != EntryFilter.FilterResult.ACCEPT) {
- return filterResult;
- }
- }
- return EntryFilter.FilterResult.ACCEPT;
- }
-
- private void fillContext(FilterContext context, MessageMetadata msgMetadata,
- Subscription subscription, Consumer consumer) {
- context.reset();
- context.setMsgMetadata(msgMetadata);
- context.setSubscription(subscription);
- context.setConsumer(consumer);
- }
-
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
new file mode 100644
index 00000000000..d2141215333
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
+
+@Data
+@ToString
+public final class AnalyzeBacklogResult {
+
+ private long entries;
+ private long messages;
+
+ private long filterRejectedEntries;
+ private long filterAcceptedEntries;
+ private long filterRescheduledEntries;
+
+ private long filterRejectedMessages;
+ private long filterAcceptedMessages;
+ private long filterRescheduledMessages;
+
+ private ScanOutcome scanOutcome;
+
+ private Position firstPosition;
+ private Position lastPosition;
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
new file mode 100644
index 00000000000..b0c7385a28f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.FilterContext;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+public class EntryFilterSupport {
+
+ /**
+ * Entry filters in Broker.
+ * Not set to final, for the convenience of testing mock.
+ */
+ protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
+ protected final FilterContext filterContext;
+ protected final Subscription subscription;
+
+ public EntryFilterSupport(Subscription subscription) {
+ this.subscription = subscription;
+ if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic()
+ .getBrokerService().getEntryFilters())) {
+ this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+ this.filterContext = new FilterContext();
+ } else {
+ this.entryFilters = ImmutableList.of();
+ this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
+ }
+ }
+
+ public EntryFilter.FilterResult runFiltersForEntry(Entry entry, MessageMetadata msgMetadata,
+ Consumer consumer) {
+ if (CollectionUtils.isNotEmpty(entryFilters)) {
+ fillContext(filterContext, msgMetadata, subscription, consumer);
+ return getFilterResult(filterContext, entry, entryFilters);
+ } else {
+ return EntryFilter.FilterResult.ACCEPT;
+ }
+ }
+
+ private void fillContext(FilterContext context, MessageMetadata msgMetadata,
+ Subscription subscription, Consumer consumer) {
+ context.reset();
+ context.setMsgMetadata(msgMetadata);
+ context.setSubscription(subscription);
+ context.setConsumer(consumer);
+ }
+
+
+ private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
+ ImmutableList<EntryFilterWithClassLoader> entryFilters) {
+ for (EntryFilter entryFilter : entryFilters) {
+ EntryFilter.FilterResult filterResult =
+ entryFilter.filterEntry(entry, filterContext);
+ if (filterResult == null) {
+ filterResult = EntryFilter.FilterResult.ACCEPT;
+ }
+ if (filterResult != EntryFilter.FilterResult.ACCEPT) {
+ return filterResult;
+ }
+ }
+ return EntryFilter.FilterResult.ACCEPT;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 49b906b7959..2736f407ebf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
@@ -111,6 +112,8 @@ public interface Subscription {
CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark);
+ CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position);
+
default int getNumberOfSameAddressConsumers(final String clientAddress) {
int count = 0;
if (clientAddress != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c3399bda4bd..3d0a313fef1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -22,6 +22,7 @@ import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
@@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -512,6 +514,14 @@ public class NonPersistentSubscription extends AbstractSubscription implements S
return completableFuture;
}
+ @Override
+ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
+ CompletableFuture<AnalyzeBacklogResult> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(
+ new Exception("Unsupported operation analyzeBacklog for NonPersistentSubscription"));
+ return completableFuture;
+ }
+
private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);
public long getLastActive() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0dc38f9bc2e..22f94ee0545 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service.persistent;
import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Predicate;
+import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -30,6 +32,8 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -44,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -52,6 +57,7 @@ import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
+import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -60,8 +66,10 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -70,6 +78,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
@@ -77,6 +86,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -512,6 +522,109 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
return "Null";
}
+ @Override
+ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {
+
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
+ }
+
+ AtomicLong entries = new AtomicLong();
+ AtomicLong accepted = new AtomicLong();
+ AtomicLong rejected = new AtomicLong();
+ AtomicLong rescheduled = new AtomicLong();
+ AtomicLong messages = new AtomicLong();
+ AtomicLong acceptedMessages = new AtomicLong();
+ AtomicLong rejectedMessages = new AtomicLong();
+ AtomicLong rescheduledMessages = new AtomicLong();
+
+ Position currentPosition = cursor.getMarkDeletedPosition();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] currentPosition {}",
+ topicName, subName, currentPosition);
+ }
+ final EntryFilterSupport entryFilterSupport = dispatcher != null
+ ? (EntryFilterSupport) dispatcher : new EntryFilterSupport(this);
+ // we put some hard limits on the scan, in order to prevent denial of services
+ ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration();
+ long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
+ long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
+ int batchSize = configuration.getDispatcherMaxReadBatchSize();
+ AtomicReference<Position> firstPosition = new AtomicReference<>();
+ AtomicReference<Position> lastPosition = new AtomicReference<>();
+ return cursor.scan(position, new Predicate<Entry>() {
+ @Override
+ public boolean apply(Entry entry) {
+ if (log.isDebugEnabled()) {
+ log.debug("found {}", entry);
+ }
+ Position entryPosition = entry.getPosition();
+ firstPosition.compareAndSet(null, entryPosition);
+ lastPosition.set(entryPosition);
+ ByteBuf metadataAndPayload = entry.getDataBuffer();
+ MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+ int numMessages = 1;
+ if (messageMetadata.hasNumMessagesInBatch()) {
+ numMessages = messageMetadata.getNumMessagesInBatch();
+ }
+ EntryFilter.FilterResult filterResult = entryFilterSupport
+ .runFiltersForEntry(entry, messageMetadata, null);
+
+ if (filterResult == null) {
+ filterResult = EntryFilter.FilterResult.ACCEPT;
+ }
+ switch (filterResult) {
+ case REJECT:
+ rejected.incrementAndGet();
+ rejectedMessages.addAndGet(numMessages);
+ break;
+ case RESCHEDULE:
+ rescheduled.incrementAndGet();
+ rescheduledMessages.addAndGet(numMessages);
+ break;
+ default:
+ accepted.incrementAndGet();
+ acceptedMessages.addAndGet(numMessages);
+ break;
+ }
+ long num = entries.incrementAndGet();
+ messages.addAndGet(numMessages);
+
+ if (num % 1000 == 0) {
+ long end = System.currentTimeMillis();
+ log.info(
+ "[{}][{}] scan running since {} ms - scanned {} entries",
+ topicName, subName, end - start, num);
+ }
+
+ return true;
+ }
+ }, batchSize, maxEntries, timeOutMs).thenApply((ScanOutcome outcome) -> {
+ long end = System.currentTimeMillis();
+ AnalyzeBacklogResult result = new AnalyzeBacklogResult();
+ result.setFirstPosition(firstPosition.get());
+ result.setLastPosition(lastPosition.get());
+ result.setEntries(entries.get());
+ result.setMessages(messages.get());
+ result.setFilterAcceptedEntries(accepted.get());
+ result.setFilterAcceptedMessages(acceptedMessages.get());
+ result.setFilterRejectedEntries(rejected.get());
+ result.setFilterRejectedMessages(rejectedMessages.get());
+ result.setFilterRescheduledEntries(rescheduled.get());
+ result.setFilterRescheduledMessages(rescheduledMessages.get());
+ // sometimes we abort the execution due to a timeout or
+ // when we reach a maximum number of entries
+ result.setScanOutcome(outcome);
+ log.info(
+ "[{}][{}] scan took {} ms - {}",
+ topicName, subName, end - start, result);
+ return result;
+ });
+
+ }
+
@Override
public CompletableFuture<Void> clearBacklog() {
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1192,4 +1305,5 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
}
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
new file mode 100644
index 00000000000..fdf6fc562db
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Lists;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+
+@Test(groups = "broker-admin")
+public class AnalyzeBacklogSubscriptionTest extends ProducerConsumerBase {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+ producerBaseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void simpleAnalyzeBacklogTest() throws Exception {
+ simpleAnalyzeBacklogTest(false);
+ }
+
+ @Test
+ public void simpleAnalyzeBacklogTestWithBatching() throws Exception {
+ simpleAnalyzeBacklogTest(true);
+ }
+
+ private void simpleAnalyzeBacklogTest(boolean batching) throws Exception {
+ int numMessages = 20;
+ int batchSize = batching ? 5 : 1;
+ int numEntries = numMessages / batchSize;
+
+ String topic = "persistent://my-property/my-ns/my-topic-" + batching;
+ String subName = "sub-1";
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), Lists.newArrayList("sub-1"));
+
+ verifyBacklog(topic, subName, 0, 0);
+
+ @Cleanup
+ Producer<byte[]> p = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(batching)
+ .batchingMaxMessages(batchSize)
+ .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.SECONDS)
+ .create();
+
+ List<CompletableFuture<MessageId>> handles = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ CompletableFuture<MessageId> handle
+ = p.sendAsync(("test-" + i).getBytes());
+ handles.add(handle);
+ }
+ FutureUtil.waitForAll(handles).get();
+
+ MessageId middleMessageId = handles.get(numMessages / 2).get();
+
+ verifyBacklog(topic, subName, numEntries, numMessages);
+
+ // create a second subscription
+ admin.topics().createSubscription(topic, "from-middle", middleMessageId);
+
+ verifyBacklog(topic, "from-middle", numEntries / 2, numMessages / 2);
+
+
+ try (Consumer consumer = pulsarClient
+ .newConsumer()
+ .topic(topic)
+ // we want to wait for the server to process acks, in order to not have a flaky test
+ .isAckReceiptEnabled(true)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe()) {
+ Message receive1 = consumer.receive();
+ Message receive2 = consumer.receive();
+ Message receive3 = consumer.receive();
+ Message receive4 = consumer.receive();
+ Message receive5 = consumer.receive();
+
+ verifyBacklog(topic, subName, numEntries, numMessages);
+
+ consumer.acknowledge(receive2);
+
+ // one individually deleted message
+ if (batching) {
+ // acknowledging a single message in a entry is not enough
+ // to count -1 for the backlog
+ verifyBacklog(topic, subName, numEntries, numMessages);
+ } else {
+ verifyBacklog(topic, subName, numEntries - 1, numMessages - 1);
+ }
+
+ consumer.acknowledge(receive1);
+ consumer.acknowledge(receive3);
+ consumer.acknowledge(receive4);
+ consumer.acknowledge(receive5);
+
+ verifyBacklog(topic, subName, numEntries - (5 / batchSize), numMessages - 5);
+
+ int count = numMessages - 5;
+ while (count -- > 0) {
+ Message m = consumer.receive();
+ consumer.acknowledge(m);
+ }
+
+ verifyBacklog(topic, subName, 0,0);
+ }
+
+ }
+
+ private void verifyBacklog(String topic, String subscription, int numEntries, int numMessages) throws Exception {
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
+ = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());
+
+ assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries());
+ assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
+ assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
+ assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+ assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
+
+ assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages());
+ assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
+ assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
+
+ assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
+ assertFalse(analyzeSubscriptionBacklogResult.isAborted());
+ }
+
+
+ @Test
+ public void partitionedTopicNotAllowed() throws Exception {
+ String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+ String subName = "sub-1";
+ admin.topics().createPartitionedTopic(topic, 2);
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+ assertEquals(admin.topics().getSubscriptions(topic), Lists.newArrayList("sub-1"));
+
+ // you cannot use this feature on a partitioned topic
+ assertThrows(PulsarAdminException.NotAllowedException.class, () -> {
+ admin.topics().analyzeSubscriptionBacklog(topic, "sub-1", Optional.empty());
+ });
+
+ // you can access single partitions
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
+ = admin.topics().analyzeSubscriptionBacklog(topic + "-partition-0", "sub-1", Optional.empty());
+ assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 12b742a0191..ada26ee6c56 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import java.io.IOException;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response.Status;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.http.HttpResponse;
@@ -63,6 +65,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
+@Slf4j
public class CreateSubscriptionTest extends ProducerConsumerBase {
@BeforeMethod
@@ -480,4 +483,5 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
assertEquals(ml.getWaitingCursorsCount(), 0);
}
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
index c8d13c4826e..19175e936c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.plugin;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -35,7 +36,7 @@ public class EntryFilterTest implements EntryFilter {
return FilterResult.ACCEPT;
}
Consumer consumer = context.getConsumer();
- Map<String, String> metadata = consumer.getMetadata();
+ Map<String, String> metadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap();
log.info("filterEntry for {}", metadata);
String matchValueAccept = metadata.getOrDefault("matchValueAccept", "ACCEPT");
String matchValueReject = metadata.getOrDefault("matchValueReject", "REJECT");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index 6d77d06da01..ee100f3fdee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -29,17 +29,18 @@ import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -48,7 +49,9 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -68,6 +71,7 @@ public class FilterEntryTest extends BrokerTestBase {
internalCleanup();
}
+ @Test
public void testFilter() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("1","1");
@@ -76,12 +80,13 @@ public class FilterEntryTest extends BrokerTestBase {
String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionProperties(map)
+ .isAckReceiptEnabled(true)
.subscriptionName(subName).subscribe();
// mock entry filters
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
- Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
@@ -96,6 +101,8 @@ public class FilterEntryTest extends BrokerTestBase {
producer.send("test");
}
+ verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0);
+
int counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -108,10 +115,28 @@ public class FilterEntryTest extends BrokerTestBase {
}
// All normal messages can be received
assertEquals(10, counter);
+
+ verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+
+ // stop the consumer
+ consumer.close();
+
MessageIdImpl lastMsgId = null;
for (int i = 0; i < 10; i++) {
lastMsgId = (MessageIdImpl) producer.newMessage().property("REJECT", "").value("1").send();
}
+
+ // analyze the subscription and predict that
+ // 10 messages will be rejected by the filter
+ verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0);
+
+ consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .isAckReceiptEnabled(true)
+ .subscriptionProperties(map)
+ .subscriptionName(subName)
+ .subscribe();
+
counter = 0;
while (true) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
@@ -125,6 +150,10 @@ public class FilterEntryTest extends BrokerTestBase {
// REJECT messages are filtered out
assertEquals(0, counter);
+ // now the Filter acknoledged the messages on behalf of the Consumer
+ // backlog is now zero again
+ verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
+
// All messages should be acked, check the MarkDeletedPosition
assertNotNull(lastMsgId);
MessageIdImpl finalLastMsgId = lastMsgId;
@@ -180,7 +209,7 @@ public class FilterEntryTest extends BrokerTestBase {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
- Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
@@ -253,7 +282,7 @@ public class FilterEntryTest extends BrokerTestBase {
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
- Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+ Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
EntryFilter filter1 = new EntryFilterTest();
@@ -328,4 +357,25 @@ public class FilterEntryTest extends BrokerTestBase {
resultConsume2.get(1, TimeUnit.MINUTES);
}
}
+
+
+ private void verifyBacklog(String topic, String subscription,
+ int numEntries, int numMessages,
+ int numEntriesAccepted, int numMessagesAccepted,
+ int numEntriesRejected, int numMessagesRejected,
+ int numEntriesRescheduled, int numMessagesRescheduled
+ ) throws Exception {
+ AnalyzeSubscriptionBacklogResult a1
+ = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());
+
+ Assert.assertEquals(numEntries, a1.getEntries());
+ Assert.assertEquals(numEntriesAccepted, a1.getFilterAcceptedEntries());
+ Assert.assertEquals(numEntriesRejected, a1.getFilterRejectedEntries());
+ Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries());
+
+ Assert.assertEquals(numMessages, a1.getMessages());
+ Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages());
+ Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages());
+ Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages());
+ }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 71ca31e5903..7e88b3fbd14 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -46,6 +47,8 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
+
/**
* Admin interface for Topics management.
*/
@@ -2049,6 +2052,46 @@ public interface Topics {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
throws PulsarAdminException;
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @return an accurate analysis of the backlog
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+ Optional<MessageId> startPosition)
+ throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @return an accurate analysis of the backlog
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName,
+ Optional<MessageId> startPosition);
+
/**
* Get backlog size by a message ID.
* @param topic
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
new file mode 100644
index 00000000000..a97ccb0bb96
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class AnalyzeSubscriptionBacklogResult {
+ private long entries;
+ private long messages;
+
+ private long filterRejectedEntries;
+ private long filterAcceptedEntries;
+ private long filterRescheduledEntries;
+
+ private long filterRejectedMessages;
+ private long filterAcceptedMessages;
+ private long filterRescheduledMessages;
+
+ private boolean aborted;
+ private String firstMessageId;
+ private String lastMessageId;
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 48cb74d5952..d0c55c01957 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -141,6 +141,15 @@ public abstract class BaseResource {
return future;
}
+ public <T, R> void asyncPostRequestWithResponse(final WebTarget target, Entity<T> entity,
+ InvocationCallback<R> callback) {
+ try {
+ request(target).async().post(entity, callback);
+ } catch (PulsarAdminException cae) {
+ callback.failed(cae);
+ }
+ }
+
public <T> CompletableFuture<Void> asyncPostRequest(final WebTarget target, Entity<T> entity) {
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index d7363597459..0b3f4ea7c7a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -88,6 +89,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
@@ -1662,6 +1664,46 @@ public class TopicsImpl extends BaseResource implements Topics {
}
}
+
+ @Override
+ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
+ String subscriptionName,
+ Optional<MessageId> startPosition)
+ throws PulsarAdminException {
+ return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition));
+ }
+
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult> analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName,
+ Optional<MessageId> startPosition) {
+ TopicName topicName = validateTopic(topic);
+ String encodedSubName = Codec.encode(subscriptionName);
+ WebTarget path = topicPath(topicName, "subscription", encodedSubName, "analyzeBacklog");
+
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new CompletableFuture<>();
+ Entity entity = null;
+ if (startPosition.isPresent()) {
+ ResetCursorData resetCursorData = new ResetCursorData(startPosition.get());
+ entity = Entity.entity(resetCursorData, MediaType.APPLICATION_JSON);
+ } else {
+ entity = null;
+ }
+
+ asyncPostRequestWithResponse(path, entity, new InvocationCallback<AnalyzeSubscriptionBacklogResult>() {
+ @Override
+ public void completed(AnalyzeSubscriptionBacklogResult res) {
+ future.complete(res);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
@Override
public Long getBacklogSizeByMessageId(String topic, MessageId messageId)
throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 79865fae759..e3e9157233c 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -1481,6 +1482,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, null);
+ cmdTopics.run(split("analyze-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
+ verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/clust/ns1/ds1", "sub1", Optional.empty());
+
// jcommander is stateful, you cannot parse the same command twice
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest --property a=b -p x=y,z"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 2913b5112bc..4b73703395e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -243,6 +244,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus());
jcommander.addCommand("get-replicated-subscription-status", new GetReplicatedSubscriptionStatus());
jcommander.addCommand("get-backlog-size", new GetBacklogSizeByMessageId());
+ jcommander.addCommand("analyze-backlog", new AnalyzeBacklog());
jcommander.addCommand("get-replication-clusters", new GetReplicationClusters());
jcommander.addCommand("set-replication-clusters", new SetReplicationClusters());
@@ -2922,6 +2924,33 @@ public class CmdTopics extends CmdBase {
}
}
+
+ @Parameters(commandDescription = "Analyze the backlog of a subscription.")
+ private class AnalyzeBacklog extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be analyzed", required = true)
+ private String subName;
+
+ @Parameter(names = { "--position",
+ "-p" }, description = "message position to start the scan from (ledgerId:entryId)", required = false)
+ private String messagePosition;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ Optional<MessageId> startPosition = Optional.empty();
+ if (isNotBlank(messagePosition)) {
+ int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex();
+ MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
+ startPosition = Optional.of(messageId);
+ }
+ print(getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition));
+
+ }
+ }
+
@Parameters(commandDescription = "Get the schema validation enforced")
private class GetSchemaValidationEnforced extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)