You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:01 UTC

[pulsar] 15/38: [Issue 6283][tiered-storage] Offload policies per namespace (#6422)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c3fa923e46ae37c7bab5503dbd40c3a0c54fe154
Author: Alexandre DUVAL <al...@wanadoo.fr>
AuthorDate: Sat Mar 28 11:35:53 2020 +0100

    [Issue 6283][tiered-storage] Offload policies per namespace (#6422)
    
    Fixes #6283
    
    ### Modifications
    
    Define and use custom deletionLag and threshold for offloadpolicies per ns.
    All is stuff is required for https://github.com/apache/pulsar/pull/6354.
    
    (cherry picked from commit 347d3851b6e62be99b0953a21d8c1a6d502ae111)
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  44 --------
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  82 ++++++++-------
 .../mledger/impl/OffloadLedgerDeleteTest.java      |   6 +-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  22 ++--
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  38 ++++++-
 .../pulsar/broker/service/BrokerService.java       |  23 ++---
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 112 ++++++++++++++++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   5 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  33 +++++-
 .../common/policies/data/OffloadPolicies.java      |  18 +++-
 site2/docs/reference-pulsar-admin.md               |   2 +
 12 files changed, 276 insertions(+), 111 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 4af66eb..24dbaab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -56,8 +56,6 @@ public class ManagedLedgerConfig {
     private long retentionTimeMs = 0;
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
-    private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
-    private long offloadAutoTriggerSizeThresholdBytes = -1;
     private long metadataOperationsTimeoutSeconds = 60;
     private long readEntryTimeoutSeconds = 120;
     private long addEntryTimeoutSeconds = 120;
@@ -408,48 +406,6 @@ public class ManagedLedgerConfig {
         return retentionSizeInMB;
     }
 
-    /**
-     * When a ledger is offloaded from bookkeeper storage to longterm storage, the bookkeeper ledger
-     * is not deleted immediately. Instead we wait for a grace period before deleting from bookkeeper.
-     * The offloadLedgerDeleteLag sets this grace period.
-     *
-     * @param lagTime period to wait before deleting offloaded ledgers from bookkeeper
-     * @param unit timeunit for lagTime
-     */
-    public ManagedLedgerConfig setOffloadLedgerDeletionLag(long lagTime, TimeUnit unit) {
-        this.offloadLedgerDeletionLagMs = unit.toMillis(lagTime);
-        return this;
-    }
-
-    /**
-     * Number of milliseconds before an offloaded ledger will be deleted from bookkeeper.
-     *
-     * @return the offload ledger deletion lag time in milliseconds
-     */
-    public long getOffloadLedgerDeletionLagMillis() {
-        return offloadLedgerDeletionLagMs;
-    }
-
-    /**
-     * Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage.
-     * A negative value disables autotriggering. A threshold of 0 offloads data as soon as possible.
-     * Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}.
-     * Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold.
-     *
-     * @param threshold Threshold in bytes at which offload is automatically triggered
-     */
-    public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long threshold) {
-        this.offloadAutoTriggerSizeThresholdBytes = threshold;
-        return this;
-    }
-
-    /**
-     * Size, in bytes, at which offloading will automatically be triggered for this managed ledger.
-     * @return the trigger threshold, in bytes
-     */
-    public long getOffloadAutoTriggerSizeThresholdBytes() {
-        return this.offloadAutoTriggerSizeThresholdBytes;
-    }
 
     /**
      * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 08b5e5e..a64f8f5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1851,8 +1851,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getOffloadAutoTriggerSizeThresholdBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                && config.getLedgerOffloader().getOffloadPolicies() != null) {
+            if (config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+                executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+            }
         }
     }
 
@@ -1871,39 +1874,43 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
                 });
 
-            long threshold = config.getOffloadAutoTriggerSizeThresholdBytes();
-            long sizeSummed = 0;
-            long alreadyOffloadedSize = 0;
-            long toOffloadSize = 0;
-
-            ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();
-
-            // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-            for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                long size = e.getValue().getSize();
-                sizeSummed += size;
-                boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                    && e.getValue().getOffloadContext().getComplete();
-                if (alreadyOffloaded) {
-                    alreadyOffloadedSize += size;
-                } else if (sizeSummed > threshold) {
-                    toOffloadSize += size;
-                    toOffload.addFirst(e.getValue());
+            if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                    && config.getLedgerOffloader().getOffloadPolicies() != null) {
+                long threshold = config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes();
+
+                long sizeSummed = 0;
+                long alreadyOffloadedSize = 0;
+                long toOffloadSize = 0;
+
+                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();
+
+                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
+                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+                    long size = e.getValue().getSize();
+                    sizeSummed += size;
+                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
+                            && e.getValue().getOffloadContext().getComplete();
+                    if (alreadyOffloaded) {
+                        alreadyOffloadedSize += size;
+                    } else if (sizeSummed > threshold) {
+                        toOffloadSize += size;
+                        toOffload.addFirst(e.getValue());
+                    }
                 }
-            }
 
-            if (toOffload.size() > 0) {
-                log.info("[{}] Going to automatically offload ledgers {}"
-                         + ", total size = {}, already offloaded = {}, to offload = {}",
-                         name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
-                         sizeSummed, alreadyOffloadedSize, toOffloadSize);
-            } else {
-                // offloadLoop will complete immediately with an empty list to offload
-                log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                          name, sizeSummed, alreadyOffloadedSize, threshold);
-            }
+                if (toOffload.size() > 0) {
+                    log.info("[{}] Going to automatically offload ledgers {}"
+                                    + ", total size = {}, already offloaded = {}, to offload = {}",
+                            name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
+                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
+                } else {
+                    // offloadLoop will complete immediately with an empty list to offload
+                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
+                            name, sizeSummed, alreadyOffloadedSize, threshold);
+                }
 
-            offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+                offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
+            }
         }
     }
 
@@ -1925,8 +1932,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private boolean isOffloadedNeedsDelete(OffloadContext offload) {
         long elapsedMs = clock.millis() - offload.getTimestamp();
-        return offload.getComplete() && !offload.getBookkeeperDeleted()
-                && elapsedMs > config.getOffloadLedgerDeletionLagMillis();
+
+        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                && config.getLedgerOffloader().getOffloadPolicies() != null) {
+            return offload.getComplete() && !offload.getBookkeeperDeleted()
+                    && elapsedMs > config.getLedgerOffloader()
+                    .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
+        } else {
+            return false;
+        }
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index 02fff69..8fbc588 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -48,7 +48,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
@@ -109,8 +109,8 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMaxEntriesPerLedger(10);
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(5, TimeUnit.MINUTES);
-        config.setOffloadLedgerDeletionLag(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(600000));
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
@@ -157,7 +157,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setMaxEntriesPerLedger(10);
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(10, TimeUnit.MINUTES);
-        config.setOffloadLedgerDeletionLag(5, TimeUnit.MINUTES);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(300000));
         config.setLedgerOffloader(offloader);
         config.setClock(clock);
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 997f3f6..97bab56 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -608,9 +608,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         config.setMaxEntriesPerLedger(10);
         config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
         config.setRetentionTime(0, TimeUnit.MINUTES);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(new Long(100));
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
         ManagedCursor cursor = ledger.openCursor("foobar");
+
         for (int i = 0; i < 15; i++) {
             String content = "entry-" + i;
             ledger.addEntry(content.getBytes());
@@ -746,9 +749,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
-        config.setOffloadAutoTriggerSizeThresholdBytes(100);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -782,9 +785,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
-        config.setOffloadAutoTriggerSizeThresholdBytes(100);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -843,9 +846,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
-        config.setOffloadAutoTriggerSizeThresholdBytes(100);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -894,9 +897,9 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
-        config.setOffloadAutoTriggerSizeThresholdBytes(100);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100);
         config.setLedgerOffloader(offloader);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -926,13 +929,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
     @Test
     public void offloadAsSoonAsClosed() throws Exception {
-
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
-        config.setOffloadAutoTriggerSizeThresholdBytes(0);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0);
         config.setLedgerOffloader(offloader);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
@@ -988,6 +990,12 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
             return deletes.keySet();
         }
 
+        OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+                OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
+
         @Override
         public String getOffloadDriverName() {
             return "mock";
@@ -1029,7 +1037,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         @Override
         public OffloadPolicies getOffloadPolicies() {
-            return null;
+            return offloadPolicies;
         }
 
         @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 619a5cd..01927f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2046,7 +2046,12 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected long internalGetOffloadThreshold() {
         validateAdminAccessForTenant(namespaceName.getTenant());
-        return getNamespacePolicies(namespaceName).offload_threshold;
+        Policies policies = getNamespacePolicies(namespaceName);
+        if (policies.offload_policies == null) {
+            return policies.offload_threshold;
+        } else {
+            return policies.offload_policies.getManagedLedgerOffloadThresholdInBytes();
+        }
     }
 
     protected void internalSetOffloadThreshold(long newThreshold) {
@@ -2057,8 +2062,13 @@ public abstract class NamespacesBase extends AdminResource {
             Stat nodeStat = new Stat();
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
+
             Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (policies.offload_policies != null) {
+                policies.offload_policies.setManagedLedgerOffloadThresholdInBytes(newThreshold);
+            }
             policies.offload_threshold = newThreshold;
+
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}",
@@ -2083,7 +2093,12 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected Long internalGetOffloadDeletionLag() {
         validateAdminAccessForTenant(namespaceName.getTenant());
-        return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+        Policies policies = getNamespacePolicies(namespaceName);
+        if (policies.offload_policies == null) {
+            return policies.offload_deletion_lag_ms;
+        } else {
+            return policies.offload_policies.getManagedLedgerOffloadDeletionLagInMillis();
+        }
     }
 
     protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
@@ -2094,8 +2109,13 @@ public abstract class NamespacesBase extends AdminResource {
             Stat nodeStat = new Stat();
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
+
             Policies policies = jsonMapper().readValue(content, Policies.class);
+            if (policies.offload_policies != null) {
+                policies.offload_policies.setManagedLedgerOffloadDeletionLagInMillis(newDeletionLagMs);
+            }
             policies.offload_deletion_lag_ms = newDeletionLagMs;
+
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
             log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}",
@@ -2234,6 +2254,20 @@ public abstract class NamespacesBase extends AdminResource {
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
             Policies policies = jsonMapper().readValue(content, Policies.class);
+
+            if (offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis()
+                    .equals(OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
+                offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(policies.offload_deletion_lag_ms);
+            } else {
+                policies.offload_deletion_lag_ms = offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis();
+            }
+            if (offloadPolicies.getManagedLedgerOffloadThresholdInBytes() ==
+                    OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES) {
+                offloadPolicies.setManagedLedgerOffloadThresholdInBytes(policies.offload_threshold);
+            } else {
+                policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
+            }
+
             policies.offload_policies = offloadPolicies;
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
                     (rc, path1, ctx, stat) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 085bae4..3f0f9cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1032,19 +1032,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
             OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+
+            if (offloadPolicies == null) {
+                offloadPolicies = new OffloadPolicies();
+                offloadPolicies.setManagedLedgerOffloadDriver(pulsar.getConfiguration().getManagedLedgerOffloadDriver());
+                offloadPolicies.setManagedLedgerOffloadThresholdInBytes(
+                        pulsar.getConfiguration().getManagedLedgerOffloadAutoTriggerSizeThresholdBytes()
+                );
+                offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(
+                        pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()
+                );
+            }
             managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
-            policies.ifPresent(p -> {
-                    long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs();
-                    if (p.offload_deletion_lag_ms != null) {
-                        lag = p.offload_deletion_lag_ms;
-                    }
-                    long bytes = serviceConfig.getManagedLedgerOffloadAutoTriggerSizeThresholdBytes();
-                    if (p.offload_threshold != -1L) {
-                        bytes = p.offload_threshold;
-                    }
-                    managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS);
-                    managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes);
-                });
 
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 25c45df..76272ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -149,7 +149,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         String endpoint = "test-endpoint";
 
         OffloadPolicies offload1 = OffloadPolicies.create(
-                driver, region, bucket, endpoint, 100, 100);
+                driver, region, bucket, endpoint, 100, 100, -1, null);
         admin.namespaces().setOffloadPolicies(namespaceName, offload1);
         OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
         Assert.assertEquals(offload1, offload2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a607921..5f39812 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -43,8 +43,12 @@ import java.net.URI;
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.WebApplicationException;
@@ -54,6 +58,8 @@ import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
@@ -75,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -1073,6 +1080,74 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         admin.tenants().deleteTenant("my-tenants");
     }
 
+    class MockLedgerOffloader implements LedgerOffloader {
+        ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, UUID>();
+        ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, UUID>();
+
+        Set<Long> offloadedLedgers() {
+            return offloads.keySet();
+        }
+
+        Set<Long> deletedOffloads() {
+            return deletes.keySet();
+        }
+
+        OffloadPolicies offloadPolicies;
+
+        public MockLedgerOffloader(OffloadPolicies offloadPolicies) {
+            this.offloadPolicies = offloadPolicies;
+        }
+
+        @Override
+        public String getOffloadDriverName() {
+            return "mock";
+        }
+
+        @Override
+        public CompletableFuture<Void> offload(ReadHandle ledger,
+                                               UUID uuid,
+                                               Map<String, String> extraMetadata) {
+            CompletableFuture<Void> promise = new CompletableFuture<>();
+            if (offloads.putIfAbsent(ledger.getId(), uuid) == null) {
+                promise.complete(null);
+            } else {
+                promise.completeExceptionally(new Exception("Already exists exception"));
+            }
+            return promise;
+        }
+
+        @Override
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
+                                                           Map<String, String> offloadDriverMetadata) {
+            CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+            promise.completeExceptionally(new UnsupportedOperationException());
+            return promise;
+        }
+
+        @Override
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,
+                                                       Map<String, String> offloadDriverMetadata) {
+            CompletableFuture<Void> promise = new CompletableFuture<>();
+            if (offloads.remove(ledgerId, uuid)) {
+                deletes.put(ledgerId, uuid);
+                promise.complete(null);
+            } else {
+                promise.completeExceptionally(new Exception("Not found"));
+            }
+            return promise;
+        };
+
+        @Override
+        public OffloadPolicies getOffloadPolicies() {
+            return offloadPolicies;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
     @Test
     public void testSetOffloadThreshold() throws Exception {
         TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic");
@@ -1088,25 +1163,54 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
         // the ledger config should have the expected value
         ManagedLedgerConfig ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
-        assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);
+        MockLedgerOffloader offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                admin.namespaces().getOffloadThreshold(namespace),
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+        ledgerConf.setLedgerOffloader(offloader);
+        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+                -1);
 
         // set an override for the namespace
         admin.namespaces().setOffloadThreshold(namespace, 100);
         assertEquals(100, admin.namespaces().getOffloadThreshold(namespace));
         ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
-        assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 100);
+        admin.namespaces().getOffloadPolicies(namespace);
+        offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                admin.namespaces().getOffloadThreshold(namespace),
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+        ledgerConf.setLedgerOffloader(offloader);
+        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+                100);
 
         // set another negative value to disable
         admin.namespaces().setOffloadThreshold(namespace, -2);
         assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
         ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
-        assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), -2);
+        offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                admin.namespaces().getOffloadThreshold(namespace),
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+        ledgerConf.setLedgerOffloader(offloader);
+        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+                -2);
 
         // set back to -1 and fall back to default
         admin.namespaces().setOffloadThreshold(namespace, -1);
         assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
         ledgerConf = pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
-        assertEquals(ledgerConf.getOffloadAutoTriggerSizeThresholdBytes(), 1);
+        offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                admin.namespaces().getOffloadThreshold(namespace),
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+        ledgerConf.setLedgerOffloader(offloader);
+        assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
+                -1);
 
         // cleanup
         admin.topics().delete(topicName.toString(), true);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d9bd2f7..9318f15 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -489,10 +489,11 @@ public class PulsarAdminToolTest {
         namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
         verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
 
-        namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M"));
+        namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s"));
         verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
                 OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
-                        "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024));
+                        "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
+                        10 * 1024 * 1024, 10000L));
 
         namespaces.run(split("get-offload-policies myprop/clust/ns1"));
         verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 0105d0e..3dd4a6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1338,6 +1338,18 @@ public class CmdNamespaces extends CmdBase {
                 required = false)
         private String readBufferSizeStr;
 
+        @Parameter(
+                names = {"--offloadAfterElapsed", "-oae"},
+                description = "Offload after elapsed in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).",
+                required = false)
+        private String offloadAfterElapsedStr;
+
+        @Parameter(
+                names = {"--offloadAfterThreshold", "-oat"},
+                description = "Offload after threshold size (eg: 1M, 5M)",
+                required = false)
+        private String offloadAfterThresholdStr;
+
         private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};
 
         public boolean driverSupported(String driver) {
@@ -1399,8 +1411,27 @@ public class CmdNamespaces extends CmdBase {
                 }
             }
 
+            Long offloadAfterElapsedInMillis = OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+            if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
+                Long offloadAfterElapsed = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+                if (positiveCheck("OffloadAfterElapsed", offloadAfterElapsed)
+                        && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
+                    offloadAfterElapsedInMillis = new Long(offloadAfterElapsed);
+                }
+            }
+
+            long offloadAfterThresholdInBytes = OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+            if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
+                long offloadAfterThreshold = validateSizeString(offloadAfterThresholdStr);
+                if (positiveCheck("OffloadAfterThreshold", offloadAfterThreshold)
+                        && maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)) {
+                    offloadAfterThresholdInBytes = new Long(offloadAfterThreshold);
+                }
+            }
+
             OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint,
-                    maxBlockSizeInBytes, readBufferSizeInBytes);
+                    maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
+                    offloadAfterElapsedInMillis);
             admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
         }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index f46b44f..5ccb75c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -39,11 +39,15 @@ public class OffloadPolicies {
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
+    public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
+    public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
 
     // common config
     private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
+    private long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+    private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
 
     // s3 config, set by service configuration or cli
     private String s3ManagedLedgerOffloadRegion = null;
@@ -68,9 +72,13 @@ public class OffloadPolicies {
     private String fileSystemURI = null;
 
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
-                                         int maxBlockSizeInBytes, int readBufferSizeInBytes) {
+                                         int maxBlockSizeInBytes, int readBufferSizeInBytes,
+                                         long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
         OffloadPolicies offloadPolicies = new OffloadPolicies();
         offloadPolicies.setManagedLedgerOffloadDriver(driver);
+        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
+        offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(offloadDeletionLagInMillis);
+
         if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
             offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
             offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
@@ -153,6 +161,8 @@ public class OffloadPolicies {
         return Objects.hash(
                 managedLedgerOffloadDriver,
                 managedLedgerOffloadMaxThreads,
+                managedLedgerOffloadThresholdInBytes,
+                managedLedgerOffloadDeletionLagInMillis,
                 s3ManagedLedgerOffloadRegion,
                 s3ManagedLedgerOffloadBucket,
                 s3ManagedLedgerOffloadServiceEndpoint,
@@ -180,6 +190,10 @@ public class OffloadPolicies {
         OffloadPolicies other = (OffloadPolicies) obj;
         return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
                 && Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
+                && Objects.equals(managedLedgerOffloadThresholdInBytes,
+                    other.getManagedLedgerOffloadThresholdInBytes())
+                && Objects.equals(managedLedgerOffloadDeletionLagInMillis,
+                    other.getManagedLedgerOffloadDeletionLagInMillis())
                 && Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion())
                 && Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket())
                 && Objects.equals(s3ManagedLedgerOffloadServiceEndpoint,
@@ -208,6 +222,8 @@ public class OffloadPolicies {
         return MoreObjects.toStringHelper(this)
                 .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
                 .add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
+                .add("managedLedgerOffloadThresholdInBytes", managedLedgerOffloadThresholdInBytes)
+                .add("managedLedgerOffloadDeletionLagInMillis", managedLedgerOffloadDeletionLagInMillis)
                 .add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion)
                 .add("s3ManagedLedgerOffloadBucket", s3ManagedLedgerOffloadBucket)
                 .add("s3ManagedLedgerOffloadServiceEndpoint", s3ManagedLedgerOffloadServiceEndpoint)
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index e97acc0..ef0dfa7 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2271,3 +2271,5 @@ Options
 |`-e`, `--endpoint`|Alternative endpoint to connect to||
 |`-mbs`, `--maxBlockSize`|Max block size|64MB|
 |`-rbs`, `--readBufferSize`|Read buffer size|1MB|
+|`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||
+|`-oae`, `--offloadAfterElapsed`|Offload after elapsed in millis (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).||