You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/11/20 00:30:39 UTC
[pulsar] branch master updated: Perform periodic flush of
ManagedCursor mark-delete posistions (#8634)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8cd908 Perform periodic flush of ManagedCursor mark-delete posistions (#8634)
a8cd908 is described below
commit a8cd908b45c2741a00411c02afc116a435e30ef5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Nov 19 16:30:14 2020 -0800
Perform periodic flush of ManagedCursor mark-delete posistions (#8634)
---
conf/broker.conf | 4 ++
conf/standalone.conf | 4 ++
.../mledger/ManagedLedgerFactoryConfig.java | 5 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 15 ++++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 53 +++++++++++++++++++++-
.../apache/pulsar/broker/ServiceConfiguration.java | 5 ++
.../pulsar/broker/ManagedLedgerClientFactory.java | 1 +
8 files changed, 107 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 5fe9e8d..416b9e4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -788,6 +788,10 @@ managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2
+# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4cea72d..9deb862 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -536,6 +536,10 @@ managedLedgerDefaultWriteQuorum=1
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=1
+# How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds).
+# Default is 60 seconds
+managedLedgerCursorPositionFlushSeconds = 60
+
# Default type of checksum to use when writing to BookKeeper. Default is "CRC32C"
# Other possible options are "CRC32", "MAC" or "DUMMY" (no checksum).
managedLedgerDigestType=CRC32C
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index f1223e5..d2109ea 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -67,6 +67,11 @@ public class ManagedLedgerFactoryConfig {
private int prometheusStatsLatencyRolloverSeconds = 60;
/**
+ * How frequently to flush the cursor positions that were accumulated due to rate limiting.
+ */
+ private int cursorPositionFlushSeconds = 60;
+
+ /**
* cluster name for prometheus stats
*/
private String clusterName;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5b59986..1b7d22e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -169,6 +169,9 @@ public class ManagedCursorImpl implements ManagedCursor {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private RateLimiter markDeleteLimiter;
+ // The cursor is considered "dirty" when there are mark-delete updates that are only applied in memory,
+ // because of the rate limiting.
+ private volatile boolean isDirty = false;
private boolean alwaysInactive = false;
@@ -1626,6 +1629,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
+ isDirty = true;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
callback.markDeleteComplete(ctx);
return;
@@ -2883,6 +2887,24 @@ public class ManagedCursorImpl implements ManagedCursor {
this.entriesReadSize += readEntriesSize;
}
+ void flush() {
+ if (!isDirty) {
+ return;
+ }
+
+ isDirty = false;
+ asyncMarkDelete(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}][{}] Failed to flush mark-delete position", ledger.getName(), name, exception);
+ }
+ }, null);
+ }
+
private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index a2c668e..f411eff 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -108,6 +108,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private long lastStatTimestamp = System.nanoTime();
private final ScheduledFuture<?> statsTask;
+ private final ScheduledFuture<?> flushCursorsTask;
private final long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
@@ -202,6 +203,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+ this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(this::flushCursors,
+ config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);
this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
@@ -230,6 +233,17 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
}
+ private synchronized void flushCursors() {
+ ledgers.values().forEach(mlfuture -> {
+ if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
+ ManagedLedgerImpl ml = mlfuture.getNow(null);
+ if (ml != null) {
+ ml.getCursors().forEach(c -> ((ManagedCursorImpl) c).flush());
+ }
+ }
+ });
+ }
+
private synchronized void refreshStats() {
long now = System.nanoTime();
long period = now - lastStatTimestamp;
@@ -483,6 +497,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
@Override
public void shutdown() throws InterruptedException, ManagedLedgerException {
statsTask.cancel(true);
+ flushCursorsTask.cancel(true);
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index e53bcae..ffee4f4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3190,8 +3190,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
return result;
}
-
- void testReadEntriesOrWaitWithMaxSize() throws Exception {
+
+ @Test
+ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxSize");
ManagedCursor c = ledger.openCursor("c");
@@ -3215,5 +3216,53 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
entries.forEach(e -> e.release());
}
+ @Test
+ public void testFlushCursorAfterInactivity() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setThrottleMarkDelete(1.0);
+
+ ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig();
+ factoryConfig.setCursorPositionFlushSeconds(1);
+ ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), factoryConfig);
+ ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c1 = ledger1.openCursor("c");
+ List<Position> positions = new ArrayList<Position>();
+
+ for (int i = 0; i < 20; i++) {
+ positions.add(ledger1.addEntry(new byte[1024]));
+ }
+
+ CountDownLatch latch = new CountDownLatch(positions.size());
+
+ positions.forEach(p -> c1.asyncMarkDelete(p, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+ throw new RuntimeException(exception);
+ }
+ }, null));
+
+ latch.await();
+
+ assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+ // Give chance to the flush to be automatically triggered.
+ Thread.sleep(3000);
+
+ // Abruptly re-open the managed ledger without graceful close
+ ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config);
+ ManagedCursor c2 = ledger2.openCursor("c");
+
+ assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1));
+
+ factory1.shutdown();
+ factory2.shutdown();
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6e55f6f..81e1fa4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1215,6 +1215,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerDefaultAckQuorum = 2;
+ @FieldContext(minValue = 1,
+ category = CATEGORY_STORAGE_ML,
+ doc = "How frequently to flush the cursor positions that were accumulated due to rate limiting. (seconds). Default is 60 seconds")
+ private int managedLedgerCursorPositionFlushSeconds = 60;
+
//
//
@FieldContext(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index a48224c..5f6cdaf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -64,6 +64,7 @@ public class ManagedLedgerClientFactory implements Closeable {
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
+ managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {