You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:01 UTC
[pulsar] 15/38: [Issue 6283][tiered-storage] Offload policies per
namespace (#6422)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c3fa923e46ae37c7bab5503dbd40c3a0c54fe154
Author: Alexandre DUVAL <al...@wanadoo.fr>
AuthorDate: Sat Mar 28 11:35:53 2020 +0100
[Issue 6283][tiered-storage] Offload policies per namespace (#6422)
Fixes #6283
### Modifications
Define and use custom deletionLag and threshold for offloadpolicies per ns.
All is stuff is required for https://github.com/apache/pulsar/pull/6354.
(cherry picked from commit 347d3851b6e62be99b0953a21d8c1a6d502ae111)
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 44 --------
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 82 ++++++++-------
.../mledger/impl/OffloadLedgerDeleteTest.java | 6 +-
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 22 ++--
.../pulsar/broker/admin/impl/NamespacesBase.java | 38 ++++++-
.../pulsar/broker/service/BrokerService.java | 23 ++---
.../pulsar/broker/admin/AdminApiOffloadTest.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 112 ++++++++++++++++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 5 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 33 +++++-
.../common/policies/data/OffloadPolicies.java | 18 +++-
site2/docs/reference-pulsar-admin.md | 2 +
12 files changed, 276 insertions(+), 111 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 4af66eb..24dbaab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -56,8 +56,6 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
- private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
- private long offloadAutoTriggerSizeThresholdBytes = -1;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
private long addEntryTimeoutSeconds = 120;
@@ -408,48 +406,6 @@ public class ManagedLedgerConfig {
return retentionSizeInMB;
}
- /**
- * When a ledger is offloaded from bookkeeper storage to longterm storage, the bookkeeper ledger
- * is not deleted immediately. Instead we wait for a grace period before deleting from bookkeeper.
- * The offloadLedgerDeleteLag sets this grace period.
- *
- * @param lagTime period to wait before deleting offloaded ledgers from bookkeeper
- * @param unit timeunit for lagTime
- */
- public ManagedLedgerConfig setOffloadLedgerDeletionLag(long lagTime, TimeUnit unit) {
- this.offloadLedgerDeletionLagMs = unit.toMillis(lagTime);
- return this;
- }
-
- /**
- * Number of milliseconds before an offloaded ledger will be deleted from bookkeeper.
- *
- * @return the offload ledger deletion lag time in milliseconds
- */
- public long getOffloadLedgerDeletionLagMillis() {
- return offloadLedgerDeletionLagMs;
- }
-
- /**
- * Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage.
- * A negative value disables autotriggering. A threshold of 0 offloads data as soon as possible.
- * Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}.
- * Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold.
- *
- * @param threshold Threshold in bytes at which offload is automatically triggered
- */
- public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long threshold) {
- this.offloadAutoTriggerSizeThresholdBytes = threshold;
- return this;
- }
-
- /**
- * Size, in bytes, at which offloading will automatically be triggered for this managed ledger.
- * @return the trigger threshold, in bytes
- */
- public long getOffloadAutoTriggerSizeThresholdBytes() {
- return this.offloadAutoTriggerSizeThresholdBytes;
- }
/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 08b5e5e..a64f8f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1851,8 +1851,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
- if (config.getOffloadAutoTriggerSizeThresholdBytes() >= 0) {
- executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+ if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader().getOffloadPolicies() != null) {
+ if (config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+ executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+ }
}
}
@@ -1871,39 +1874,43 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
});
- long threshold = config.getOffloadAutoTriggerSizeThresholdBytes();
- long sizeSummed = 0;
- long alreadyOffloadedSize = 0;
- long toOffloadSize = 0;
-
- ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();
-
- // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
- for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
- long size = e.getValue().getSize();
- sizeSummed += size;
- boolean alreadyOffloaded = e.getValue().hasOffloadContext()
- && e.getValue().getOffloadContext().getComplete();
- if (alreadyOffloaded) {
- alreadyOffloadedSize += size;
- } else if (sizeSummed > threshold) {
- toOffloadSize += size;
- toOffload.addFirst(e.getValue());
+ if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader().getOffloadPolicies() != null) {
+ long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();
+
+ long sizeSummed = 0;
+ long alreadyOffloadedSize = 0;
+ long toOffloadSize = 0;
+
+ ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();
+
+ // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
+ for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+ long size = e.getValue().getSize();
+ sizeSummed += size;
+ boolean alreadyOffloaded = e.getValue().hasOffloadContext()
+ && e.getValue().getOffloadContext().getComplete();
+ if (alreadyOffloaded) {
+ alreadyOffloadedSize += size;
+ } else if (sizeSummed > threshold) {
+ toOffloadSize += size;
+ toOffload.addFirst(e.getValue());
+ }
}
- }
- if (toOffload.size() > 0) {
- log.info("[{}] Going to automatically offload ledgers {}"
- + ", total size = {}, already offloaded = {}, to offload = {}",
- name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
- sizeSummed, alreadyOffloadedSize, toOffloadSize);
- } else {
- // offloadLoop will complete immediately with an empty list to offload
- log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
- name, sizeSummed, alreadyOffloadedSize, threshold);
- }
+ if (toOffload.size() > 0) {
+ log.info("[{}] Going to automatically offload ledgers {}"
+ + ", total size = {}, already offloaded = {}, to offload = {}",
+ name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
+ sizeSummed, alreadyOffloadedSize, toOffloadSize);
+ } else {
+ // offloadLoop will complete immediately with an empty list to offload
+ log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
+ name, sizeSummed, alreadyOffloadedSize, threshold);
+ }
- offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+ offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+ }
}
}
@@ -1925,8 +1932,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private boolean isOffloadedNeedsDelete(OffloadContext offload) {
long elapsedMs = clock.millis() - offload.getTimestamp();
- return offload.getComplete() && !offload.getBookkeeperDeleted()
- && elapsedMs > config.getOffloadLedgerDeletionLagMillis();
+
+ if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+ && config.getLedgerOffloader().getOffloadPolicies() != null) {
+ return offload.getComplete() && !offload.getBookkeeperDeleted()
+ && elapsedMs > config.getLedgerOffloader()
+ .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
+ } else {
+ return false;
+ }
}
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index 02fff69..8fbc588 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -48,7 +48,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
- config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
config.setLedgerOffloader(offloader);
config.setClock(clock);
@@ -109,8 +109,8 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(5, TimeUnit.MINUTES);
- config.setOffloadLedgerDeletionLag(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(600000));
config.setLedgerOffloader(offloader);
config.setClock(clock);
@@ -157,7 +157,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
- config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
config.setLedgerOffloader(offloader);
config.setClock(clock);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 997f3f6..97bab56 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -608,9 +608,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(0, TimeUnit.MINUTES);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(100));
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
ManagedCursor cursor = ledger.openCursor("foobar");
+
for (int i = 0; i < 15; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
@@ -746,9 +749,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
- config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -782,9 +785,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
- config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -843,9 +846,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
- config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -894,9 +897,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
- config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -926,13 +929,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
@Test
public void offloadAsSoonAsClosed() throws Exception {
-
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
- config.setOffloadAutoTriggerSizeThresholdBytes(0);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
+ offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -988,6 +990,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
return deletes.keySet();
}
+ OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
+
@Override
public String getOffloadDriverName() {
return "mock";
@@ -1029,7 +1037,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
@Override
public OffloadPolicies getOffloadPolicies() {
- return null;
+ return offloadPolicies;
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 619a5cd..01927f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2046,7 +2046,12 @@ public abstract class NamespacesBase extends AdminResource {
protected long internalGetOffloadThreshold() {
validateAdminAccessForTenant(namespaceName.getTenant());
- return getNamespacePolicies(namespaceName).offload_threshold;
+ Policies policies = getNamespacePolicies(namespaceName);
+ if (policies.offload_policies == null) {
+ return policies.offload_threshold;
+ } else {
+ return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes();
+ }
}
protected void internalSetOffloadThreshold(long newThreshold) {
@@ -2057,8 +2062,13 @@ public abstract class NamespacesBase extends AdminResource {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
+
Policies policies = jsonMapper().readValue(content, Policies.class);
+ if (policies.offload_policies != null) {
+ policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
+ }
policies.offload_threshold = newThreshold;
+
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
@@ -2083,7 +2093,12 @@ public abstract class NamespacesBase extends AdminResource {
protected Long internalGetOffloadDeletionLag() {
validateAdminAccessForTenant(namespaceName.getTenant());
- return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+ Policies policies = getNamespacePolicies(namespaceName);
+ if (policies.offload_policies == null) {
+ return policies.offload_deletion_lag_ms;
+ } else {
+ return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis();
+ }
}
protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
@@ -2094,8 +2109,13 @@ public abstract class NamespacesBase extends AdminResource {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
+
Policies policies = jsonMapper().readValue(content, Policies.class);
+ if (policies.offload_policies != null) {
+ policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
+ }
policies.offload_deletion_lag_ms = newDeletionLagMs;
+
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}",
@@ -2234,6 +2254,20 @@ public abstract class NamespacesBase extends AdminResource {
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
+
+ if (offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis()
+ .equals(OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
+ offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
+ } else {
+ policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
+ }
+ if (offloadPolicies.getManagedLedgerOffloadThresholdInBytes() ==
+ OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) {
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
+ } else {
+ policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
+ }
+
policies.offload_policies = offloadPolicies;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
(rc, path1, ctx, stat) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 085bae4..3f0f9cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1032,19 +1032,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+
+ if (offloadPolicies == null) {
+ offloadPolicies = new OffloadPolicies();
+ offloadPolicies.setManagedLedgerOffloadDriver(pulsar.getConfiguration().getManagedLedgerOffloadDriver());
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(
+ pulsar.getConfiguration().getManagedLedgerOffloadAutoTriggerSizeThresholdBytes()
+ );
+ offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()
+ );
+ }
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
- policies.ifPresent(p -> {
- long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
- if (p.offload_deletion_lag_ms != null) {
- lag = p.offload_deletion_lag_ms;
- }
- long bytes = serviceConfig.getManagedLedgerOffloadAutoTriggerSizeThresholdBytes();
- if (p.offload_threshold != -1L) {
- bytes = p.offload_threshold;
- }
- managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
- managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes);
- });
future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 25c45df..76272ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -149,7 +149,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
String endpoint = "test-endpoint";
OffloadPolicies offload1 = OffloadPolicies.create(
- driver, region, bucket, endpoint, 100, 100);
+ driver, region, bucket, endpoint, 100, 100, -1, null);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
Assert.assertEquals(offload1, offload2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a607921..5f39812 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -43,8 +43,12 @@ import java.net.URI;
import java.net.URL;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.WebApplicationException;
@@ -54,6 +58,8 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.v1.Namespaces;
@@ -75,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -1073,6 +1080,74 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
admin.tenants().deleteTenant("my-tenants");
}
+ class MockLedgerOffloader implements LedgerOffloader {
+ ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, UUID>();
+ ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, UUID>();
+
+ Set<Long> offloadedLedgers() {
+ return offloads.keySet();
+ }
+
+ Set<Long> deletedOffloads() {
+ return deletes.keySet();
+ }
+
+ OffloadPolicies offloadPolicies;
+
+ public MockLedgerOffloader(OffloadPolicies offloadPolicies) {
+ this.offloadPolicies = offloadPolicies;
+ }
+
+ @Override
+ public String getOffloadDriverName() {
+ return "mock";
+ }
+
+ @Override
+ public CompletableFuture<Void> offload(ReadHandle ledger,
+ UUID uuid,
+ Map<String, String> extraMetadata) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (offloads.putIfAbsent(ledger.getId(), uuid) == null) {
+ promise.complete(null);
+ } else {
+ promise.completeExceptionally(new Exception("Already exists exception"));
+ }
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
+ Map<String, String> offloadDriverMetadata) {
+ CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
+ Map<String, String> offloadDriverMetadata) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (offloads.remove(ledgerId, uuid)) {
+ deletes.put(ledgerId, uuid);
+ promise.complete(null);
+ } else {
+ promise.completeExceptionally(new Exception("Not found"));
+ }
+ return promise;
+ };
+
+ @Override
+ public OffloadPolicies getOffloadPolicies() {
+ return offloadPolicies;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
@Test
public void testSetOffloadThreshold() throws Exception {
TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic");
@@ -1088,25 +1163,54 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
// the ledger config should have the expected value
ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
- assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);
+ MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ admin.namespaces().getOffloadThreshold(namespace),
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ ledgerConf.setLedgerOffloader(offloader);
+ assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+ -1);
// set an override for the namespace
admin.namespaces().setOffloadThreshold(namespace, 100);
assertEquals(100, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
- assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 100);
+ admin.namespaces().getOffloadPolicies(namespace);
+ offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ admin.namespaces().getOffloadThreshold(namespace),
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ ledgerConf.setLedgerOffloader(offloader);
+ assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+ 100);
// set another negative value to disable
admin.namespaces().setOffloadThreshold(namespace, -2);
assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
- assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), -2);
+ offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ admin.namespaces().getOffloadThreshold(namespace),
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ ledgerConf.setLedgerOffloader(offloader);
+ assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+ -2);
// set back to -1 and fall back to default
admin.namespaces().setOffloadThreshold(namespace, -1);
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
- assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);
+ offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ admin.namespaces().getOffloadThreshold(namespace),
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ ledgerConf.setLedgerOffloader(offloader);
+ assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+ -1);
// cleanup
admin.topics().delete(topicName.toString(), true);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d9bd2f7..9318f15 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -489,10 +489,11 @@ public class PulsarAdminToolTest {
namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
- namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M"));
+ namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
- "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024));
+ "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
+ 10 * 1024 * 1024, 10000L));
namespaces.run(split("get-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0105d0e..3dd4a6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1338,6 +1338,18 @@ public class CmdNamespaces extends CmdBase {
required = false)
private String readBufferSizeStr;
+ @Parameter(
+ names = {"--offloadAfterElapsed", "-oae"},
+ description = "Offload after elapsed in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).",
+ required = false)
+ private String offloadAfterElapsedStr;
+
+ @Parameter(
+ names = {"--offloadAfterThreshold", "-oat"},
+ description = "Offload after threshold size (eg: 1M, 5M)",
+ required = false)
+ private String offloadAfterThresholdStr;
+
private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};
public boolean driverSupported(String driver) {
@@ -1399,8 +1411,27 @@ public class CmdNamespaces extends CmdBase {
}
}
+ Long offloadAfterElapsedInMillis = OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+ if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
+ Long offloadAfterElapsed = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+ if (positiveCheck("OffloadAfterElapsed", offloadAfterElapsed)
+ && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
+ offloadAfterElapsedInMillis = new Long(offloadAfterElapsed);
+ }
+ }
+
+ long offloadAfterThresholdInBytes = OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+ if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
+ long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr);
+ if (positiveCheck("OffloadAfterThreshold", offloadAfterThreshold)
+ && maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) {
+ offloadAfterThresholdInBytes = new Long(offloadAfterThreshold);
+ }
+ }
+
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
- maxBlockSizeInBytes, readBufferSizeInBytes);
+ maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
+ offloadAfterElapsedInMillis);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index f46b44f..5ccb75c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -39,11 +39,15 @@ public class OffloadPolicies {
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
+ public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
+ public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
// common config
private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
+ private long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+ private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
// s3 config, set by service configuration or cli
private String s3ManagedLedgerOffloadRegion = null;
@@ -68,9 +72,13 @@ public class OffloadPolicies {
private String fileSystemURI = null;
public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
- int maxBlockSizeInBytes, int readBufferSizeInBytes) {
+ int maxBlockSizeInBytes, int readBufferSizeInBytes,
+ long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
OffloadPolicies offloadPolicies = new OffloadPolicies();
offloadPolicies.setManagedLedgerOffloadDriver(driver);
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
+ offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis);
+
if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -153,6 +161,8 @@ public class OffloadPolicies {
return Objects.hash(
managedLedgerOffloadDriver,
managedLedgerOffloadMaxThreads,
+ managedLedgerOffloadThresholdInBytes,
+ managedLedgerOffloadDeletionLagInMillis,
s3ManagedLedgerOffloadRegion,
s3ManagedLedgerOffloadBucket,
s3ManagedLedgerOffloadServiceEndpoint,
@@ -180,6 +190,10 @@ public class OffloadPolicies {
OffloadPolicies other = (OffloadPolicies) obj;
return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
&& Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
+ && Objects.equals(managedLedgerOffloadThresholdInBytes,
+ other.getManagedLedgerOffloadThresholdInBytes())
+ && Objects.equals(managedLedgerOffloadDeletionLagInMillis,
+ other.getManagedLedgerOffloadDeletionLagInMillis())
&& Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion())
&& Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket())
&& Objects.equals(s3ManagedLedgerOffloadServiceEndpoint,
@@ -208,6 +222,8 @@ public class OffloadPolicies {
return MoreObjects.toStringHelper(this)
.add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
.add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
+ .add("managedLedgerOffloadThresholdInBytes", managedLedgerOffloadThresholdInBytes)
+ .add("managedLedgerOffloadDeletionLagInMillis", managedLedgerOffloadDeletionLagInMillis)
.add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion)
.add("s3ManagedLedgerOffloadBucket", s3ManagedLedgerOffloadBucket)
.add("s3ManagedLedgerOffloadServiceEndpoint", s3ManagedLedgerOffloadServiceEndpoint)
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index e97acc0..ef0dfa7 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2271,3 +2271,5 @@ Options
|`-e`, `--endpoint`|Alternative endpoint to connect to||
|`-mbs`, `--maxBlockSize`|Max block size|64MB|
|`-rbs`, `--readBufferSize`|Read buffer size|1MB|
+|`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||
+|`-oae`, `--offloadAfterElapsed`|Offload after elapsed in millis (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).||