You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/11 03:46:33 UTC

[pulsar] branch master updated: Avoid redundant calls for getting the offload policies from the offloader (#11629)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af9b800  Avoid redundant calls for getting the offload policies from the offloader (#11629)
af9b800 is described below

commit af9b800e3a5fda2c3dfc76f930ddb146e443a141
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Aug 11 11:45:47 2021 +0800

    Avoid redundant calls for getting the offload policies from the offloader (#11629)
    
    * Avoid redundant calls for getting the offload policies from the offloader
    
    If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
    for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
    are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
    This will lead high CPU usage
    
    Stack:
    
    ```
    "bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
       java.lang.Thread.State: RUNNABLE
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
    	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
    	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
    	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
    	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
    	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
    	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
    	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
    	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.lang.Thread.run(Thread.java:748)
    ```
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 +++++++--------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++++++++++++++++++++++
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 19 +++++-------
 3 files changed, 53 insertions(+), 24 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 285b5d8..d9d11c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -129,6 +129,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -2305,19 +2306,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
     }
 
-    private boolean isOffloadedNeedsDelete(OffloadContext offload) {
+    boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) {
         long elapsedMs = clock.millis() - offload.getTimestamp();
-
-        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                && config.getLedgerOffloader().getOffloadPolicies() != null
-                && config.getLedgerOffloader().getOffloadPolicies()
-                    .getManagedLedgerOffloadDeletionLagInMillis() != null) {
-            return offload.getComplete() && !offload.getBookkeeperDeleted()
-                    && elapsedMs > config.getLedgerOffloader()
-                    .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
-        } else {
-            return false;
-        }
+        return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted()
+                && policies.getManagedLedgerOffloadDeletionLagInMillis() != null
+                && elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent();
     }
 
     /**
@@ -2338,6 +2331,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
         List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
+        Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
+                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
+                ? config.getLedgerOffloader().getOffloadPolicies()
+                : null);
         synchronized (this) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
@@ -2421,7 +2418,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
 
             for (LedgerInfo ls : ledgers.values()) {
-                if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
+                if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
+                        && !ledgersToDelete.contains(ls)) {
                     log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
                             ls.getLedgerId());
                     offloadedLedgersToDelete.add(ls);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index cd80537..ee9ff41 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -87,6 +89,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -105,6 +108,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -3173,4 +3177,34 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         cursor3.close();
         ledger.close();
     }
+
+    @Test
+    public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        config.setMaxSizePerLedgerMb(1);
+        LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
+        OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
+        when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
+        config.setLedgerOffloader(ledgerOffloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
+                "testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);
+
+        // Retain the data.
+        ledger.openCursor("test-cursor");
+        final int entries = 10;
+        byte[] data = new byte[1024 * 1024];
+        for (int i = 0; i < entries; i++) {
+            ledger.addEntry(data);
+        }
+        assertEquals(ledger.ledgers.size(), 10);
+
+        // Set a new offloader to cleanup the execution times of getOffloadPolicies()
+        ledgerOffloader = mock(NullLedgerOffloader.class);
+        config.setLedgerOffloader(ledgerOffloader);
+
+        ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
+        verify(ledgerOffloader, times(1)).getOffloadPolicies();
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index 258987c..f25332e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -20,14 +20,13 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
 
-import java.lang.reflect.Method;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.util.MockClock;
@@ -219,25 +218,23 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
         config.setLedgerOffloader(ledgerOffloader);
         config.setClock(clock);
 
-        ManagedLedger managedLedger = factory.open("isOffloadedNeedsDeleteTest", config);
-        Class<ManagedLedgerImpl> clazz = ManagedLedgerImpl.class;
-        Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", MLDataFormats.OffloadContext.class);
-        method.setAccessible(true);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("isOffloadedNeedsDeleteTest", config);
 
         MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder()
                 .setTimestamp(config.getClock().millis() - 1000)
                 .setComplete(true)
                 .setBookkeeperDeleted(false)
                 .build();
-        Boolean needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+
+        boolean needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L);
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
         Assert.assertTrue(needsDelete);
 
         offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2);
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadContext = MLDataFormats.OffloadContext.newBuilder()
@@ -245,7 +242,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
                 .setComplete(false)
                 .setBookkeeperDeleted(false)
                 .build();
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
         offloadContext = MLDataFormats.OffloadContext.newBuilder()
@@ -253,7 +250,7 @@ public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
                 .setComplete(true)
                 .setBookkeeperDeleted(true)
                 .build();
-        needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
+        needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
         Assert.assertFalse(needsDelete);
 
     }