You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/04/14 02:26:26 UTC

[pulsar] 02/10: [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a2f9ea0384f5e2e4b579443d157336853b450f9
Author: Michael Marshall <47...@users.noreply.github.com>
AuthorDate: Thu Apr 1 13:07:32 2021 -0600

    [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)
    
    In Pulsar 2.7.0, there is a class loader leak. It looks like https://github.com/apache/pulsar/pull/8739 fixed the leak by only loading the offloader classes for the directory configured in `broker.conf`. However, the solution in https://github.com/apache/pulsar/pull/8739 ignores the fact that an offload policy can override the the offloaded directory. As such, there could be a regression in 2.7.1 if users are providing multiple offload directories.
    
    This PR returns the functionality without reintroducing the class loader leak.
    
    Update the `PulsarService` and the `PulsarConnectorCache` classes to use a map from directory strings to `Offloaders`.
    
    The new `Map` has keys of type `String`, but we could use keys of type `Path` and then normalize the paths to ensure that `./offloaders` and `offloaders` result in a single class loader. However, it looks like the `normalize` method in the path class has a warning about symbolic links. As such, I went with the basic `String` approach, which might lead to some duplication of loaded classes. Below is the javadoc for `normalize`, in case that helps for a design decision.
    
    ```java
      /**
         * Returns a path that is this path with redundant name elements eliminated.
         *
         * <p> The precise definition of this method is implementation dependent but
         * in general it derives from this path, a path that does not contain
         * <em>redundant</em> name elements. In many file systems, the "{@code .}"
         * and "{@code ..}" are special names used to indicate the current directory
         * and parent directory. In such file systems all occurrences of "{@code .}"
         * are considered redundant. If a "{@code ..}" is preceded by a
         * non-"{@code ..}" name then both names are considered redundant (the
         * process to identify such names is repeated until it is no longer
         * applicable).
         *
         * <p> This method does not access the file system; the path may not locate
         * a file that exists. Eliminating "{@code ..}" and a preceding name from a
         * path may result in the path that locates a different file than the original
         * path. This can arise when the preceding name is a symbolic link.
         *
         * @return  the resulting path or this path if it does not contain
         *          redundant name elements; an empty path is returned if this path
         *          does have a root component and all name elements are redundant
         *
         * @see #getParent
         * @see #toRealPath
         */
        Path normalize();
    ```
    
    This change is a code cleanup without any test coverage that should be covered by other tests. If required, I can create some tests.
    
    (cherry picked from commit 6c3ebbb01415cdfe094650ae0eeeea6dcc224e87)
---
 .../bookkeeper/mledger/offload/OffloaderUtils.java |  4 +-
 .../mledger/offload/OffloadersCache.java           | 68 ++++++++++++++++++++++
 .../mledger/offload/OffloadersCacheTest.java       | 62 ++++++++++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    | 13 +++--
 .../pulsar/sql/presto/PulsarConnectorCache.java    | 11 ++--
 5 files changed, 144 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 5243691..bc747d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -114,8 +114,8 @@ public class OffloaderUtils {
         }
     }
 
-    public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
-        Path path = Paths.get(connectorsDirectory).toAbsolutePath();
+    public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+        Path path = Paths.get(offloadersPath).toAbsolutePath();
         log.info("Searching for offloaders in {}", path);
 
         Offloaders offloaders = new Offloaders();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
new file mode 100644
index 0000000..e80c75b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of an Offloaders. The main purpose of this class is to
+ * ensure that an Offloaders directory is only loaded once.
+ */
+@Slf4j
+public class OffloadersCache implements AutoCloseable {
+
+    private Map<String, Offloaders> loadedOffloaders = new ConcurrentHashMap<>();
+
+    /**
+     * Method to load an Offloaders directory or to get an already loaded Offloaders directory.
+     *
+     * @param offloadersPath - the directory to search the offloaders nar files
+     * @param narExtractionDirectory - the directory to use for extraction
+     * @return the loaded offloaders class
+     * @throws IOException when fail to retrieve the pulsar offloader class
+     */
+    public Offloaders getOrLoadOffloaders(String offloadersPath, String narExtractionDirectory) {
+        return loadedOffloaders.computeIfAbsent(offloadersPath,
+                (directory) -> {
+                    try {
+                        return OffloaderUtils.searchForOffloaders(directory, narExtractionDirectory);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Override
+    public void close() {
+        loadedOffloaders.values().forEach(offloaders -> {
+            try {
+                offloaders.close();
+            } catch (Exception e) {
+                log.error("Error while closing offloader.", e);
+                // Even if the offloader fails to close, the graceful shutdown process continues
+            }
+        });
+        // Don't want to hold on to references to closed offloaders
+        loadedOffloaders.clear();
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
new file mode 100644
index 0000000..1c2cd85
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.testng.Assert.assertSame;
+
+@PrepareForTest({OffloaderUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.pulsar.common.nar.*"})
+public class OffloadersCacheTest {
+
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testLoadsOnlyOnce() throws Exception {
+        Offloaders expectedOffloaders = new Offloaders();
+
+        PowerMockito.mockStatic(OffloaderUtils.class);
+        PowerMockito.when(OffloaderUtils.searchForOffloaders(eq("./offloaders"), eq("/tmp")))
+                .thenReturn(expectedOffloaders);
+
+        OffloadersCache cache = new OffloadersCache();
+
+        // Call a first time to load the offloader
+        Offloaders offloaders1 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders1, expectedOffloaders, "The offloaders should be the mocked one.");
+
+        // Call a second time to get the stored offlaoder
+        Offloaders offloaders2 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders2, expectedOffloaders, "The offloaders should be the mocked one.");
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index fcabff2..ef8df6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -66,9 +66,9 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -183,7 +183,7 @@ public class PulsarService implements AutoCloseable {
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager = new Offloaders();
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
@@ -370,7 +370,7 @@ public class PulsarService implements AutoCloseable {
                 schemaRegistryService.close();
             }
 
-            offloaderManager.close();
+            offloadersCache.close();
 
             if (protocolHandlers != null) {
                 protocolHandlers.close();
@@ -482,8 +482,6 @@ public class PulsarService implements AutoCloseable {
             schemaRegistryService = SchemaRegistryService.create(
                     schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
 
-            this.offloaderManager = OffloaderUtils.searchForOffloaders(
-                    config.getOffloadersDirectory(), config.getNarExtractionDirectory());
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPolicies.create(this.getConfiguration().getProperties()));
             this.brokerInterceptor = BrokerInterceptors.load(config);
@@ -932,7 +930,10 @@ public class PulsarService implements AutoCloseable {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
                 try {
                     return offloaderFactory.create(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index c10312a..757995a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -35,8 +35,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
@@ -57,7 +57,7 @@ public class PulsarConnectorCache {
 
     private final StatsProvider statsProvider;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager;
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
 
@@ -155,9 +155,9 @@ public class PulsarConnectorCache {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                         "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(),
                         pulsarConnectorConfig.getNarExtractionDirectory());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
 
                 try {
@@ -195,8 +195,7 @@ public class PulsarConnectorCache {
                 instance.statsProvider.stop();
                 instance.managedLedgerFactory.shutdown();
                 instance.offloaderScheduler.shutdown();
-                instance.offloaderManager.close();
-                instance = null;
+                instance.offloadersCache.close();
             }
         }
     }