You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/26 11:44:29 UTC

[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17398: [feat][offload] ADD offload_time_threshold policy (time-based offloading)

hangc0276 commented on code in PR #17398:
URL: https://github.com/apache/pulsar/pull/17398#discussion_r1005541075


##########
conf/broker.conf:
##########
@@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
 # (default is -1, which is disabled)
 managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1
 
+# The number of seconds before triggering automatic offload to long term storage
+# (default is -1, which is disabled)
+managedLedgerOffloadThresholdInSeconds=-1

Review Comment:
   `managedLedgerOffloadAutoTriggerThresholdInSeconds` to keep sync with `managedLedgerOffloadAutoTriggerSizeThresholdBytes`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   The same as above.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE

Review Comment:
   Do we need to move the following two checks before `offloadMutex.tryLock()`? It will skip acquiring the mutex.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   If the check passed, it represents not enabled offload. I'm not sure it ok to complete an exception.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   We have checked `offloadTimeThresholdMillis >= 0` and `offloadThresholdInBytes >= 0` before. I think we can skip this check condition.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));

Review Comment:
   Do we need to complete `unlockingPromise` instead of `finalPromise`? If we complete `finalPromise` directly, the `offloadMutex` won't unlock, and it will cause deadlock.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2404,72 +2405,100 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
     }
 
     private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
-        if (config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
-                && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
-            executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            return;
+        }
+
+        final OffloadPoliciesImpl policies = config.getLedgerOffloader().getOffloadPolicies();
+        final long offloadThresholdInBytes =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
+        final long offloadThresholdInSeconds =
+                Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
+        if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
+            executor.executeOrdered(name,
+                    safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
         }
     }
 
-    private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
+    private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
+                              CompletableFuture<PositionImpl> finalPromise) {
         if (!offloadMutex.tryLock()) {
             scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
-                                       100, TimeUnit.MILLISECONDS);
-        } else {
-            CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
-            unlockingPromise.whenComplete((res, ex) -> {
-                    offloadMutex.unlock();
-                    if (ex != null) {
-                        finalPromise.completeExceptionally(ex);
-                    } else {
-                        finalPromise.complete(res);
-                    }
-                });
+                    100, TimeUnit.MILLISECONDS);
+            return;
+        }
 
-            if (config.getLedgerOffloader() != null
-                    && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                    && config.getLedgerOffloader().getOffloadPolicies() != null
-                    && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
-                    != null) {
-                long threshold = config.getLedgerOffloader().getOffloadPolicies()
-                        .getManagedLedgerOffloadThresholdInBytes();
-
-                long sizeSummed = 0;
-                long alreadyOffloadedSize = 0;
-                long toOffloadSize = 0;
-
-                ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
-
-                // go through ledger list from newest to oldest and build a list to offload in oldest to newest order
-                for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
-                    long size = e.getValue().getSize();
-                    sizeSummed += size;
-                    boolean alreadyOffloaded = e.getValue().hasOffloadContext()
-                            && e.getValue().getOffloadContext().getComplete();
-                    if (alreadyOffloaded) {
-                        alreadyOffloadedSize += size;
-                    } else if (sizeSummed > threshold) {
-                        toOffloadSize += size;
-                        toOffload.addFirst(e.getValue());
-                    }
-                }
+        CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
+        unlockingPromise.whenComplete((res, ex) -> {
+            offloadMutex.unlock();
+            if (ex != null) {
+                finalPromise.completeExceptionally(ex);
+            } else {
+                finalPromise.complete(res);
+            }
+        });
 
-                if (toOffload.size() > 0) {
-                    log.info("[{}] Going to automatically offload ledgers {}"
-                                    + ", total size = {}, already offloaded = {}, to offload = {}",
-                            name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
-                            sizeSummed, alreadyOffloadedSize, toOffloadSize);
-                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
-                } else {
-                    // offloadLoop will complete immediately with an empty list to offload
-                    log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
-                            name, sizeSummed, alreadyOffloadedSize, threshold);
-                    unlockingPromise.complete(PositionImpl.LATEST);
+        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
+                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+            String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
+            String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+                    + "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
+            finalPromise.completeExceptionally(new IllegalArgumentException(msg));
+            return;
+        }
+
+        long sizeSummed = 0;
+        long toOffloadSize = 0;
+        long alreadyOffloadedSize = 0;
+        ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
+        final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
+
+        for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
+            final LedgerInfo info = e.getValue();
+            // Skip current active ledger, an active ledger can't be offloaded.
+            // Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
+            if (info.getTimestamp() == 0L) {
+                continue;
+            }
+
+            final long size = info.getSize();
+            final long timestamp = info.getTimestamp();
+            final long now = System.currentTimeMillis();
+            sizeSummed += size;
+
+            final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
+            if (alreadyOffloaded) {
+                alreadyOffloadedSize += size;
+            } else {
+                if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
+                        || (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {

Review Comment:
   The check condition is different with offloadByBytes. I suggest to keep the same.
   ```
   // offload by size
   sizeSummed > offloadThresholdInBytes
   
   // offload by time
   now - timestamp >= offloadTimeThresholdMillis
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org