You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/19 06:55:57 UTC

[GitHub] [pulsar] tjiuming opened a new pull request, #17398: [feat][tiered-storage] ADD offload_time_threshold policy

tjiuming opened a new pull request, #17398:
URL: https://github.com/apache/pulsar/pull/17398

   Fixes https://github.com/apache/pulsar/issues/17361
   
   ### Motivation
   
   Add auto trigger offload by time policy.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991170457


##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -69,6 +69,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
             .of("S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss");
     public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
+    public static final Long DEFAULT_OFFLOAD_TIME_THRESHOLD = null;

Review Comment:
   ```suggestion
       public static final Long DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS = null;
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -178,7 +182,9 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu
                                              String role, String roleSessionName,
                                              String credentialId, String credentialSecret,
                                              Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
-                                             Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
+                                             Long offloadThresholdInBytes,
+                                             //Long offloadTimeThreshold, //TODO

Review Comment:
   Is It still useful?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);

Review Comment:
   Why we need sleep here?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}

Review Comment:
   We should also have one that with long time threshold and 100 bytes size threshold, make sure the part of the ledgers will be offloaded



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)

Review Comment:
   ```
   sizeSummed > offloadThresholdInBytes
   ```
   
   Interesting, I think this one is not correct. It will make the ledger offload earlier. It's not introduced by this PR but it looks like we need to fix it with another PR.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.

Review Comment:
   ```suggestion
           // Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are negative values.
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;

Review Comment:
   This one is also interesting, if the ledger is offloaded, why not break the loop? All the ledgers before an offloaded ledger should be offloaded, no? 



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);

Review Comment:
   Why we need sleep here?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.

Review Comment:
   And I think we don't need a comment here, the code is very obvious and clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991831479


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)

Review Comment:
   I also confused here, because `sizeSummed` also includes the ledger which already offloaded. Maybe we need to fix it with another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1006338342


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   Why some tests will be failed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005632249


##########
conf/broker.conf:
##########
@@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
 # (default is -1, which is disabled)
 managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1
 
+# The number of seconds before triggering automatic offload to long term storage
+# (default is -1, which is disabled)
+managedLedgerOffloadThresholdInSeconds=-1

Review Comment:
   Ok, make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991212702


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   When you trigger the offload, no data offloaded successfully. But there is no error in the broker, that doesn't make sense.
   I think this case won't happen. But we also shouldn't make it complete successfully. I think it's better to make the promise fail. 
   



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -839,7 +887,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
         for (int i = 0; i < 25; i++) {
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
+        System.out.println("before");
         offloadRunning.await();
+        System.out.println("after");

Review Comment:
   Remove unused output?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -839,7 +887,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
         for (int i = 0; i < 25; i++) {
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
+        System.out.println("before");

Review Comment:
   Remove unused output?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)

Review Comment:
   Why it makes the ledger offload earlier? Doesn't the `offloadThresholdInBytes` mean how much data I want to keep in the bookkeeper?



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
 
         assertEquals(ledger.getLedgersInfoAsList().size(), 3);
 
-        // offload should eventually be triggered
-        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
-        assertEquals(offloader.offloadedLedgers(),
-                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+        if (sizeThreshold == null && timeThreshold != null && timeThreshold == 0L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        } else if (sizeThreshold != null && sizeThreshold == 100L && timeThreshold == null) {
+            // The last 2 ledgers won't be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0)));
+        } else if (sizeThreshold != null && sizeThreshold == -1L && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold == null && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold != null && sizeThreshold == 1L && timeThreshold != null && timeThreshold == 1L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        }
     }
 
-    @Test
-    public void manualTriggerWhileAutoInProgress() throws Exception {
+    @DataProvider(name = "manualTriggerWhileAutoInProgress")
+    public Object[][] manualTriggerWhileAutoInProgressProvider() {

Review Comment:
   Manually trigger offload is not controlled by size and time. It calculates the message id on the client side and then uses the message id to offload.
   So this test is useless. And we need to support the time on the client side or support the related REST API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991832181


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;

Review Comment:
   Yes, but it just record the number of bytes and print in the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r964596306


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,82 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;

Review Comment:
   you need to complete the future to release the lock.
   And it's better to get the debug log back. 



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,82 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        String threshold = null;
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        if (config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null) {

Review Comment:
   Shouldn't it be more like the retention policy? We can offload them by time and size at the same time I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1006337606


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1006598566


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   @hangc0276 
   Say, in the `OffloadPrefixTest`, there is a test called `public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold)`. And if I set `sizeThreshold = null`, and `timeThreshold = 0`.
   
   Each ledger has 10 entries, and I'll write 35 entries into the topic. So, there will 4 ledgers, right? Say, the four ledgers' id are [1,2,3,4]. [1,2,3] reached the 10 entries threshold, and it closed. all the 3 ledgers will be offloaded right? it is the expected behaviour.
   
   However, the result is actually random. [3] ledger maybe offloaded, or maybe not. 
   
   In the `ledgerClosed()` method, we set the LedgerInfo timestamp
   ```
               LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
                       .setSize(lh.getLength()).setTimestamp(clock.millis()).build();
               ledgers.put(lh.getId(), info);
   ```
   and trigger offload 
   ```
           maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
   ```
   
   and `maybeOffloadInBackground` will goes to 
   
   ```
       private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
                                 CompletableFuture<PositionImpl> finalPromise) {
           // ignore
   
           for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
               // ignore...
   
               final long timestamp = info.getTimestamp();
               final long now = System.currentTimeMillis();
               sizeSummed += size;
   
               final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
               if (alreadyOffloaded) {
                   alreadyOffloadedSize += size;
               } else {
                   if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
                           || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {
                       toOffloadSize += size;
                       toOffload.addFirst(info);
                   }
               }
           }
         
         // ignore
   ```
   
   In the process from 
   ```
   LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger)
                       .setSize(lh.getLength()).setTimestamp(clock.millis()).build()
   ```
    to  
   ```
     if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
                           || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {
                       toOffloadSize += size;
                       toOffload.addFirst(info);
                   }
   ```
   , it may costs more than 1ms, or maybe not.  So the last ledger maybe will offloaded, or maybe not.
   This brings uncertainty, So I `now - timestamp >= offloadTimeThresholdMillis` here to eliminate this uncertainty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17398:
URL: https://github.com/apache/pulsar/pull/17398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r974163755


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   @tjiuming I see the config name is `managedLedgerOffloadTimeThresholdInSeconds`. It's better change to `managedLedgerOffloadThresholdInSeconds`
   
   In TopicPolicies, we have `managedLedgerOffloadThresholdInBytes`. Keeping the config name in the same pattern will help users to find by configuration name prefix.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994072402


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   @zymap It looks like the trigger for the offloading by the user will not reach here. It is handled by asyncOffloadPrefix? And for this method, we have other places that also return the `PositionImpl.LATEST` if no data need offload. I think we should just keep them consistent with current behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1234136805

   for cmd/rest-api update offload policy, I'll create another PR to do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r970672795


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   Please also add to broker.conf



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,68 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final Long offloadThresholdInBytes = policies.getManagedLedgerOffloadThresholdInBytes();
+        final Long offloadTimeThreshold = policies.getManagedLedgerOffloadTimeThresholdInSeconds();
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes != null && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThreshold != null && now - timestamp > offloadTimeThreshold)) {

Review Comment:
   I think we should add the check to the method `maybeOffloadInBackground`?
   If both the `thresholdInBytes` and `thresholdInSeconds` is null or negative value, we should skip the `maybeOffload` method



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   ```suggestion
       private long managedLedgerOffloadThresholdInSeconds = -1L;
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,68 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final Long offloadThresholdInBytes = policies.getManagedLedgerOffloadThresholdInBytes();
+        final Long offloadTimeThreshold = policies.getManagedLedgerOffloadTimeThresholdInSeconds();
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes != null && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThreshold != null && now - timestamp > offloadTimeThreshold)) {

Review Comment:
   What does `offloadTimeThreshold = -1` mean? From here if users provide a negative value or zero, will we offload all ledgers?



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1919,6 +1919,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "The number of bytes before triggering automatic offload to long term storage"
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "The threshold to triggering automatic offload to long term storage"
+    )
+    private long managedLedgerOffloadTimeThresholdInSeconds = -1L;

Review Comment:
   Please check all.
   I noticed here is using `managedLedgerOffloadThresholdInSeconds`
   
   https://github.com/apache/pulsar/pull/17398/files#diff-a9946a41d139c80164fa362288fdf2ec6a5af5a901213881238e47517ddc0f3bR329



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994140864


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   yes, it should be simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r974793855


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -739,14 +740,23 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {

Review Comment:
   Please add a test to cover the case when size and time are set.



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -739,14 +740,23 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},

Review Comment:
   We need to add a test to cover the time that is not 0.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,81 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();

Review Comment:
   Policies may be null. Please add checks for null



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,81 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadTimeThresholdInSeconds())
+                        .filter(v -> v >= 0).map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadTimeThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp > offloadTimeThresholdMillis)) {

Review Comment:
   ```suggestion
                   if ((sizeSummed > offloadThresholdInBytes)
                           || (now - timestamp > offloadTimeThresholdMillis)) {
   ```
   
   I think we don't need to check the `offloadThresholdInBytes >= 0` and `offloadTimeThresholdMillis >= 0`? Because we already check it in line 2396



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991838423


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
 
         assertEquals(ledger.getLedgersInfoAsList().size(), 3);
 
-        // offload should eventually be triggered
-        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
-        assertEquals(offloader.offloadedLedgers(),
-                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+        if (sizeThreshold == null && timeThreshold != null && timeThreshold == 0L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        } else if (sizeThreshold != null && sizeThreshold == 100L && timeThreshold == null) {
+            // The last 2 ledgers won't be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0)));
+        } else if (sizeThreshold != null && sizeThreshold == -1L && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold == null && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold != null && sizeThreshold == 1L && timeThreshold != null && timeThreshold == 1L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        }
     }
 
-    @Test
-    public void manualTriggerWhileAutoInProgress() throws Exception {
+    @DataProvider(name = "manualTriggerWhileAutoInProgress")
+    public Object[][] manualTriggerWhileAutoInProgressProvider() {

Review Comment:
   Yes, but for the test, it needs to auto-trigger offload first, the provider is for auto-trigger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1277176355

   @codelipenghui @zymap please help review again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005611715


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE

Review Comment:
   Yes, it should be better.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   I thought about this place, not like `offloadThresholdInBytes`, `offloadTimeThresholdMillis` has certain randomness, From `ledgerClosed` to this place, maybe after 1ms, or maybe not. If we change `now - timestamp >= offloadTimeThresholdMillis` to `now - timestamp > offloadTimeThresholdMillis`, some tests will be failed. Or, we can delay 1ms in 
   ```
       private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
           if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
                   || config.getLedgerOffloader().getOffloadPolicies() == null) {
               return;
           }
   
           final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
           final long offloadThresholdInBytes =
                   Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
           final long offloadThresholdInSeconds =
                   Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
           if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
          
   
              // delay 1ms here
              Thread.sleep(1);
               executor.executeOrdered(name,
                       safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
           }
       }
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   No, we just checked `offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0` or `offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0` before. Maybe one of them is less than 0.
   So the `offloadThresholdInBytes >= 0` and `offloadTimeThresholdMillis >= 0` are necessary



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   https://github.com/apache/pulsar/pull/17398#discussion_r991212702



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   1. https://github.com/apache/pulsar/pull/17398#discussion_r991212702
   2. yes, it's a bug, and I've fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r977903204


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2358,60 +2359,81 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadTimeThresholdInSeconds())
+                        .filter(v -> v >= 0).map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadTimeThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp > offloadTimeThresholdMillis)) {

Review Comment:
   I just judge `offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0` before, but one of `offloadThresholdInBytes` and `offloadTimeThresholdMillis` might be < 0. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1249123433

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991838595


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
 
         assertEquals(ledger.getLedgersInfoAsList().size(), 3);
 
-        // offload should eventually be triggered
-        assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
-        assertEquals(offloader.offloadedLedgers(),
-                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+        if (sizeThreshold == null && timeThreshold != null && timeThreshold == 0L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        } else if (sizeThreshold != null && sizeThreshold == 100L && timeThreshold == null) {
+            // The last 2 ledgers won't be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0)));
+        } else if (sizeThreshold != null && sizeThreshold == -1L && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold == null && timeThreshold == null) {
+            // Offloading is disabled, no ledgers will be offloaded.
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 0);
+            assertEquals(offloader.offloadedLedgers().size(), 0);
+        } else if (sizeThreshold != null && sizeThreshold == 1L && timeThreshold != null && timeThreshold == 1L) {
+            // All the inactive ledgers will be offloaded
+            assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+            List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+            assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
+        }
     }
 
-    @Test
-    public void manualTriggerWhileAutoInProgress() throws Exception {
+    @DataProvider(name = "manualTriggerWhileAutoInProgress")
+    public Object[][] manualTriggerWhileAutoInProgressProvider() {

Review Comment:
   And for CLI and RestAPI, I'll create another PR to do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991831955


##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -178,7 +182,9 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu
                                              String role, String roleSessionName,
                                              String credentialId, String credentialSecret,
                                              Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
-                                             Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
+                                             Long offloadThresholdInBytes,
+                                             //Long offloadTimeThreshold, //TODO

Review Comment:
   It's for the CLI and RestAPI, I'll remove here



##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java:
##########
@@ -178,7 +182,9 @@ public static OffloadPoliciesImpl create(String driver, String region, String bu
                                              String role, String roleSessionName,
                                              String credentialId, String credentialSecret,
                                              Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
-                                             Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
+                                             Long offloadThresholdInBytes,
+                                             //Long offloadTimeThreshold, //TODO

Review Comment:
   It's for the CLI and RestAPI, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994079456


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   AsyncOffload doesn't trigger here. It executed from maybeOffload. 
   Actually, this won't happen because we have checked the conditions before triggering offload. But if we check, I think it must throw exceptions because before checking the conditions are non-null, and here becomes null. It doesn't make sense. @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r995276900


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2370,11 +2370,13 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
         final long offloadThresholdInSeconds =
                 Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
         if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {

Review Comment:
   We should check the offloadThresholdInBytes>0 and offloadThresholdInSeconds>0? Because it's unexpected, who are using this method should not provide the negative number of these 2 params.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1283947611

   @tjiuming Please check the check-style issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1257258336

   @codelipenghui @zymap PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1292840198

   Hi @tjiuming, besides configurations (already included in this PR), did you plan to add more information to Pulsar docs about this feature? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1237702940

   @hangc0276 @codelipenghui PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994088378


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy
URL: https://github.com/apache/pulsar/pull/17398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1284961604

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17398?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`master@7ac8e3e`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17398/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17398?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #17398   +/-   ##
   =========================================
     Coverage          ?   48.65%           
     Complexity        ?     6835           
   =========================================
     Files             ?      393           
     Lines             ?    43484           
     Branches          ?     4471           
   =========================================
     Hits              ?    21156           
     Misses            ?    19921           
     Partials          ?     2407           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `48.65% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005541075


##########
conf/broker.conf:
##########
@@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
 # (default is -1, which is disabled)
 managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1
 
+# The number of seconds before triggering automatic offload to long term storage
+# (default is -1, which is disabled)
+managedLedgerOffloadThresholdInSeconds=-1

Review Comment:
   `managedLedgerOffloadAutoTriggerThresholdInSeconds` to keep sync with `managedLedgerOffloadAutoTriggerSizeThresholdBytes`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   The same as above.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE

Review Comment:
   Do we need to move the following two checks before `offloadMutex.tryLock()`? It will skip acquiring the mutex.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   If the check passed, it represents not enabled offload. I'm not sure it ok to complete an exception.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   We have checked `offloadTimeThresholdMillis >= 0` and `offloadThresholdInBytes >= 0` before. I think we can skip this check condition.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   Do we need to complete `unlockingPromise` instead of `finalPromise`? If we complete `finalPromise` directly, the `offloadMutex` won't unlock, and it will cause deadlock.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   The check condition is different with offloadByBytes. I suggest to keep the same.
   ```
   // offload by size
   sizeSummed > offloadThresholdInBytes
   
   // offload by time
   now - timestamp >= offloadTimeThresholdMillis
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005622725


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   I thought about this place, unlike `offloadThresholdInBytes`, `offloadTimeThresholdMillis` has certain randomness, From `ledgerClosed` to this place, maybe need cost 1ms, or maybe not. If we change it from `now - timestamp >= offloadTimeThresholdMillis` to `now - timestamp > offloadTimeThresholdMillis`, some tests will be failed. Or, we can delay 1ms in 
   ```
       private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
           if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
                   || config.getLedgerOffloader().getOffloadPolicies() == null) {
               return;
           }
   
           final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
           final long offloadThresholdInBytes =
                   Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
           final long offloadThresholdInSeconds =
                   Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
           if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
          
   
              // delay 1ms here
              Thread.sleep(1);
               executor.executeOrdered(name,
                       safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991833200


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -787,33 +790,77 @@ private static byte[] buildEntry(int size, String pattern) {
         return entry;
     }
 
-    @Test
-    public void testAutoTriggerOffload() throws Exception {
+    @DataProvider(name = "testAutoTriggerOffload")
+    public Object[][] testAutoTriggerOffloadProvider() {
+        return new Object[][]{
+                {null, 0L},
+                {100L, null},
+                {-1L, null},
+                {null, null},
+                {-1L, -1L},
+                {1L, 1L}
+        };
+    }
+
+    @Test(dataProvider = "testAutoTriggerOffload")
+    public void testAutoTriggerOffload(Long sizeThreshold, Long timeThreshold) throws Exception {
         MockLedgerOffloader offloader = new MockLedgerOffloader();
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
-        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(100L);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(sizeThreshold);
+        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(timeThreshold);
         config.setLedgerOffloader(offloader);
 
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger" + UUID.randomUUID(), config);
 
         // Ledger will roll twice, offload will run on first ledger after second closed
         for (int i = 0; i < 25; i++) {
+            Thread.sleep(10);

Review Comment:
   Removed, I want to delay trigger offload for a while, but it's an incorrect way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991834957


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java:
##########
@@ -839,7 +887,9 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
         for (int i = 0; i < 25; i++) {
             ledger.addEntry(buildEntry(10, "entry-" + i));
         }
+        System.out.println("before");

Review Comment:
   sorry, I forgot here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1246648467

   @tjiuming Please also help open new issue to track the feature support on namespace/topic level policy, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1248044828

   @codelipenghui @zymap PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1288351382

   Great Job @tjiuming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1006336855


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   make sense
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#issuecomment-1293234130

   > Hi @tjiuming, besides configurations (already included in this PR), did you plan to add more information to Pulsar docs about this feature?
   
   @momo-jun I'll open a doc PR after the issue fixed and RestAPI CLI PR merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005611375


##########
conf/broker.conf:
##########
@@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
 # (default is -1, which is disabled)
 managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1
 
+# The number of seconds before triggering automatic offload to long term storage
+# (default is -1, which is disabled)
+managedLedgerOffloadThresholdInSeconds=-1

Review Comment:
   There is a method `compatibleWithBrokerConfigFile` in OffloadPoliciesImpl.java. 
   The `managedLedgerOffloadAutoTriggerThresholdInBytes` has different names in `ServiceConfiguration` and `OffloadPoliciesImpl.java`. I don't keep align with `managedLedgerOffloadAutoTriggerSizeThresholdBytes` here is for the purpose of keep align with `OffloadPoliciesImpl.java`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994277166


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)

Review Comment:
   @codelipenghui seems it's OK, and won't make the ledger offload earlier. because the order is reversed, from the newest ledger to the oldest ledger(5,4,3,2,1).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r991760746


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadTimeThresholdMillis =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
+                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
+
+        //Skip the following steps if `offloadTimeThreshold` and `offloadThresholdInBytes` are null OR negative values.
+        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
+            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
+                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
+            unlockingPromise.complete(PositionImpl.LATEST);
+            return;
+        }
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)

Review Comment:
   @zymap If you have 3 ledgers with 100MB for each ledger.  And the threshold is 120MB.
   In this case, the second ledger will be offloaded? so the non-offloaded data will be 100MB. only the first ledger will not be offloaded



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17398: [feat][tiered-storage] ADD offload_time_threshold policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r994087613


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2354,72 +2355,105 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
             executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
         }
     }
 
     private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            log.debug("[{}] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.complete(PositionImpl.LATEST);

Review Comment:
   @zymap Got your point.
   
   I have a tryout on my laptop, it looks simpler. @zymap @tjiuming Could you please take a look?
   
   ```diff
   --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
   @@ -2366,11 +2366,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
            final long offloadThresholdInSeconds =
                    Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
            if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
   -            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
   +            executor.executeOrdered(name, safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
            }
        }
    
   -    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
   +    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
   +                              CompletableFuture<PositionImpl> finalPromise) {
   +        checkArgument(offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0);
            if (!offloadMutex.tryLock()) {
                scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
                        100, TimeUnit.MILLISECONDS);
   @@ -2397,22 +2399,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
            long sizeSummed = 0;
            long toOffloadSize = 0;
            long alreadyOffloadedSize = 0;
   +        long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
            ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
   -
   -        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
   -        final long offloadThresholdInBytes =
   -                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
   -        final long offloadTimeThresholdMillis =
   -                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).filter(v -> v >= 0)
   -                        .map(TimeUnit.SECONDS::toMillis).orElse(-1L);
   -
   -        if (offloadThresholdInBytes < 0 && offloadTimeThresholdMillis < 0) {
   -            log.debug("[{}] Nothing to offload due to [managedLedgerOffloadAutoTriggerSizeThresholdBytes] "
   -                    + "and [managedLedgerOffloadThresholdInSeconds] are null OR negative values.", name);
   -            unlockingPromise.complete(PositionImpl.LATEST);
   -            return;
   -        }
   -
            for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org