You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/04/22 18:42:35 UTC
[pulsar] branch master updated: Fix check backlogged cursors
without consumer (#6766)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d59f2b Fix check backlogged cursors without consumer (#6766)
4d59f2b is described below
commit 4d59f2b8a002c48676ca67bbb137ca0078f7fa8d
Author: zhaorongsheng <zh...@users.noreply.github.com>
AuthorDate: Thu Apr 23 02:42:22 2020 +0800
Fix check backlogged cursors without consumer (#6766)
* fix the backlogged cursors bug
* add some test
* change test case
* change test case
* change test case
* change check logical
* move backlogged cursor check after consumer add
Co-authored-by: zhaorongsheng <zh...@bigo.sg>
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 6 -
.../mledger/ManagedLedgerFactoryConfig.java | 5 -
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 80 ------------
.../pulsar/broker/ManagedLedgerClientFactory.java | 1 -
.../apache/pulsar/broker/service/PulsarStats.java | 2 +-
.../org/apache/pulsar/broker/service/Topic.java | 6 +
.../service/nonpersistent/NonPersistentTopic.java | 5 +
.../broker/service/persistent/PersistentTopic.java | 20 ++-
.../pulsar/broker/service/PersistentTopicTest.java | 141 ++++++++++++++++++++-
.../client/api/SimpleProducerConsumerTest.java | 4 +-
11 files changed, 171 insertions(+), 114 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index e004c82..d1174f6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -326,12 +326,6 @@ public interface ManagedLedger {
*/
long getOffloadedSize();
- /**
- * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating
- * backlog.
- */
- void checkBackloggedCursors();
-
void asyncTerminate(TerminateCallback callback, Object ctx);
/**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index b5d0a7c..4d1eb31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -48,11 +48,6 @@ public class ManagedLedgerFactoryConfig {
private long cacheEvictionTimeThresholdMillis = 1000;
/**
- * Threshould to consider a cursor as "backlogged"
- */
- private long thresholdBackloggedCursor = 1000;
-
- /**
* Whether we should make a copy of the entry payloads when inserting in cache
*/
private boolean copyEntriesInCache = false;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9133344..c7dc19d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -236,8 +236,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
.newUpdater(ManagedLedgerImpl.class, "addOpCount");
private volatile long addOpCount = 0;
- private final long backloggedCursorThresholdEntries;
-
// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
@@ -278,7 +276,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();
- this.backloggedCursorThresholdEntries = factory.getConfig().getThresholdBackloggedCursor();
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
@@ -912,18 +909,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
@Override
- public void checkBackloggedCursors() {
- // activate caught up cursors
- cursors.forEach(cursor -> {
- if (cursor.getNumberOfEntries() < backloggedCursorThresholdEntries) {
- cursor.setActive();
- } else {
- cursor.setInactive();
- }
- });
- }
-
- @Override
public long getEstimatedBacklogSize() {
PositionImpl pos = getMarkDeletePositionOfSlowestConsumer();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b185b75..a5e6408 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2079,71 +2079,6 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
@Test
- public void testBacklogCursor() throws Exception {
- int backloggedThreshold = 10;
- ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
- factoryConf.setThresholdBackloggedCursor(backloggedThreshold);
- ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_backlog_ledger");
-
- // Open Cursor also adds cursor into activeCursor-container
- ManagedCursor cursor1 = ledger.openCursor("c1");
- ManagedCursor cursor2 = ledger.openCursor("c2");
-
- CountDownLatch latch = new CountDownLatch(backloggedThreshold);
- for (int i = 0; i < backloggedThreshold + 1; i++) {
- String content = "entry"; // 5 bytes
- ByteBuf entry = getMessageWithMetadata(content.getBytes());
- ledger.asyncAddEntry(entry, new AddEntryCallback() {
- @Override
- public void addComplete(Position position, Object ctx) {
- latch.countDown();
- entry.release();
- }
-
- @Override
- public void addFailed(ManagedLedgerException exception, Object ctx) {
- latch.countDown();
- entry.release();
- }
-
- }, null);
- }
- latch.await();
-
- // Verify: cursors are active as :haven't started deactivateBacklogCursor scan
- assertTrue(cursor1.isActive());
- assertTrue(cursor2.isActive());
-
- // deactivate backlog cursors
- ledger.checkBackloggedCursors();
-
- // both cursors have to be inactive
- assertFalse(cursor1.isActive());
- assertFalse(cursor2.isActive());
-
- // read entries so, cursor1 reaches maxBacklog threshold again to be active again
- List<Entry> entries1 = cursor1.readEntries(50);
- for (Entry entry : entries1) {
- log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
- entry.release();
- }
-
- // activate cursors which caught up maxbacklog threshold
- ledger.checkBackloggedCursors();
-
- // verify: cursor1 has consumed messages so, under maxBacklog threshold => active
- assertTrue(cursor1.isActive());
-
- // verify: cursor2 has not consumed messages so, above maxBacklog threshold => inactive
- assertFalse(cursor2.isActive());
-
- ledger.close();
-
- factory.shutdown();
- }
-
- @Test
public void testConcurrentOpenCursor() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor");
@@ -2280,21 +2215,6 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
}
- public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
- MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
- .setProducerName("prod-name").setSequenceId(0).build();
- ByteBuf payload = Unpooled.wrappedBuffer(data, 0, data.length);
-
- int msgMetadataSize = messageData.getSerializedSize();
- int headersSize = 4 + msgMetadataSize;
- ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
- ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
- headers.writeInt(msgMetadataSize);
- messageData.writeTo(outStream);
- outStream.recycle();
- return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
- }
-
@Test
public void testConsumerSubscriptionInitializePosition() throws Exception{
final int MAX_ENTRY_PER_LEDGER = 2;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 44f5d25..09fea0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -55,7 +55,6 @@ public class ManagedLedgerClientFactory implements Closeable {
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
- managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index 678abf3..66c5b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -138,7 +138,7 @@ public class PulsarStats implements Closeable {
}
// this task: helps to activate inactive-backlog-cursors which have caught up and
// connected, also deactivate active-backlog-cursors which has backlog
- ((PersistentTopic) topic).getManagedLedger().checkBackloggedCursors();
+ ((PersistentTopic) topic).checkBackloggedCursors();
}else if (topic instanceof NonPersistentTopic) {
tempNonPersistentTopics.add((NonPersistentTopic) topic);
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 890e02a..aa147d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -129,6 +129,12 @@ public interface Topic {
void checkInactiveSubscriptions();
+ /**
+ * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating
+ * backlog.
+ */
+ void checkBackloggedCursors();
+
void checkMessageExpiry();
void checkMessageDeduplicationInfo();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2792234..332abfb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -865,6 +865,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
@Override
+ public void checkBackloggedCursors() {
+ // no-op
+ }
+
+ @Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c5c5cee..8086209 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
+ private final long backloggedCursorThresholdEntries;
public volatile boolean delayedDeliveryEnabled = false;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
@@ -217,6 +218,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
USAGE_COUNT_UPDATER.set(this, 0);
this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
this.delayedDeliveryTickTimeMillis = brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
+ this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -275,6 +277,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
+ this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
}
private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
@@ -594,12 +597,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscriptionFuture.thenAccept(subscription -> {
try {
- ledger.checkBackloggedCursors();
-
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
subscription.addConsumer(consumer);
+ checkBackloggedCursors();
+
if (!cnx.isActive()) {
consumer.close();
if (log.isDebugEnabled()) {
@@ -1702,6 +1705,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
});
}
+ @Override
+ public void checkBackloggedCursors() {
+ // activate caught up cursors which include consumers
+ subscriptions.forEach((subName, subscription) -> {
+ if (!subscription.getConsumers().isEmpty()
+ && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
+ subscription.getCursor().setActive();
+ } else {
+ subscription.getCursor().setInactive();
+ }
+ });
+ }
+
/**
* Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
* marked as inactive.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d717a5a..37cdec6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -22,7 +22,6 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.matches;
@@ -41,12 +40,14 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -70,13 +71,17 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -95,6 +100,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
@@ -102,8 +108,11 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
@@ -128,7 +137,7 @@ import io.netty.buffer.Unpooled;
/**
*/
-public class PersistentTopicTest {
+public class PersistentTopicTest extends MockedBookKeeperTestCase {
private PulsarService pulsar;
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
@@ -1507,4 +1516,132 @@ public class PersistentTopicTest {
topic.checkCompaction();
verify(compactor, times(0)).compact(anyString());
}
+
+ @Test
+ public void testBacklogCursor() throws Exception {
+ int backloggedThreshold = 10;
+ pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_backlog_ledger");
+ PersistentTopic topic = new PersistentTopic(successTopicName, ledger, brokerService);
+
+ // STEP1: prepare cursors
+ // Open cursor1, add it into activeCursor-container and add it into subscription consumer list
+ ManagedCursor cursor1 = ledger.openCursor("c1");
+ PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false);
+ Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
+ 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+ topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
+ sub1.addConsumer(consumer1);
+ // Open cursor2, add it into activeCursor-container and add it into subscription consumer list
+ ManagedCursor cursor2 = ledger.openCursor("c2");
+ PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false);
+ Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
+ 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+ topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
+ sub2.addConsumer(consumer2);
+ // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list
+ ManagedCursor cursor3 = ledger.openCursor("c3");
+ PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false);
+ Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
+ 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null);
+ topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
+
+ // Case1: cursors are active as haven't started deactivateBacklogCursor scan
+ assertTrue(cursor1.isActive());
+ assertTrue(cursor2.isActive());
+ assertTrue(cursor3.isActive());
+
+ // deactivate cursor which consumer list is empty
+ topic.checkBackloggedCursors();
+
+ // Case2: cursor3 change to be inactive as it does not include consumer
+ assertTrue(cursor1.isActive());
+ assertTrue(cursor2.isActive());
+ assertFalse(cursor3.isActive());
+
+ // Write messages to ledger
+ CountDownLatch latch = new CountDownLatch(backloggedThreshold);
+ for (int i = 0; i < backloggedThreshold + 1; i++) {
+ String content = "entry"; // 5 bytes
+ ByteBuf entry = getMessageWithMetadata(content.getBytes());
+ ledger.asyncAddEntry(entry, new AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ latch.countDown();
+ entry.release();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ latch.countDown();
+ entry.release();
+ }
+
+ }, null);
+ }
+ latch.await();
+
+ assertTrue(cursor1.isActive());
+ assertTrue(cursor2.isActive());
+ assertFalse(cursor3.isActive());
+
+ // deactivate backlog cursors
+ topic.checkBackloggedCursors();
+
+ // Case3: cursor1 and cursor2 change to be inactive because of the backlog
+ assertFalse(cursor1.isActive());
+ assertFalse(cursor2.isActive());
+ assertFalse(cursor3.isActive());
+
+ // read entries so, cursor1 reaches maxBacklog threshold again to be active again
+ List<Entry> entries1 = cursor1.readEntries(50);
+ for (Entry entry : entries1) {
+ log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+ entry.release();
+ }
+ List<Entry> entries3 = cursor3.readEntries(50);
+ for (Entry entry : entries3) {
+ log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+ entry.release();
+ }
+
+ // activate cursors which caught up maxbacklog threshold
+ topic.checkBackloggedCursors();
+
+ // Case4:
+ // cursor1 has consumed messages so, under maxBacklog threshold => active
+ assertTrue(cursor1.isActive());
+ // cursor2 has not consumed messages so, above maxBacklog threshold => inactive
+ assertFalse(cursor2.isActive());
+ // cursor3 has not consumer so do not change to active
+ assertFalse(cursor3.isActive());
+
+ // add consumer to sub3 and read entries
+ sub3.addConsumer(consumer3);
+ entries3 = cursor3.readEntries(50);
+ for (Entry entry : entries3) {
+ log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
+ entry.release();
+ }
+
+ topic.checkBackloggedCursors();
+ // Case5: cursor3 has consumer so change to active
+ assertTrue(cursor3.isActive());
+ }
+
+ private ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
+ MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
+ .setProducerName("prod-name").setSequenceId(0).build();
+ ByteBuf payload = Unpooled.wrappedBuffer(data, 0, data.length);
+
+ int msgMetadataSize = messageData.getSerializedSize();
+ int headersSize = 4 + msgMetadataSize;
+ ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
+ ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
+ headers.writeInt(msgMetadataSize);
+ messageData.writeTo(outStream);
+ outStream.recycle();
+ return ByteBufPair.coalesce(ByteBufPair.get(headers, payload));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ad039e1..51a7eef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1040,7 +1040,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
Thread.sleep(maxMessageCacheRetentionTimeMillis);
// 4. deactivate subscriber which has built the backlog
- ledger.checkBackloggedCursors();
+ topicRef.checkBackloggedCursors();
Thread.sleep(100);
// 5. verify: active subscribers
@@ -1055,7 +1055,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
subscriber2.acknowledge(msg);
}
- ledger.checkBackloggedCursors();
+ topicRef.checkBackloggedCursors();
activeSubscriber.clear();
ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));