You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/05 10:58:59 UTC

[pulsar] branch master updated: [ML] Make ManagedLedger storage configurable (#9397)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ddc3813  [ML] Make ManagedLedger storage configurable (#9397)
ddc3813 is described below

commit ddc381363ec175d86c7b8086ecc81cfeda43438e
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Feb 5 02:57:35 2021 -0800

    [ML] Make ManagedLedger storage configurable (#9397)
    
    *Motivation*
    
    This is the first step to allow supporting different storage implementations for Pulsar
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 ++
 .../pulsar/broker/ManagedLedgerClientFactory.java  | 26 +++---
 .../org/apache/pulsar/broker/PulsarService.java    |  9 +-
 .../broker/storage/ManagedLedgerStorage.java       | 95 ++++++++++++++++++++++
 .../apache/pulsar/broker/storage/package-info.java | 22 +++++
 .../broker/service/BrokerBookieIsolationTest.java  |  6 +-
 .../client/impl/SequenceIdWithErrorTest.java       |  4 +-
 7 files changed, 150 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 44381bd..34746f6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1349,6 +1349,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int defaultNumPartitions = 1;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
+        doc = "The class of the managed ledger storage"
+    )
+    private String managedLedgerStorageClassName = "org.apache.pulsar.broker.ManagedLedgerClientFactory";
+    @FieldContext(
+        category = CATEGORY_STORAGE_ML,
         doc = "Number of threads to be used for managed ledger tasks dispatching"
     )
     private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index c86b7de..f174433 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
@@ -36,23 +35,24 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ManagedLedgerClientFactory implements Closeable {
+public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
 
-    private final ManagedLedgerFactory managedLedgerFactory;
-    private final BookKeeper defaultBkClient;
+    private ManagedLedgerFactory managedLedgerFactory;
+    private BookKeeper defaultBkClient;
     private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
             bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
     private StatsProvider statsProvider = new NullStatsProvider();
 
-    public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
-                                      BookKeeperClientFactory bookkeeperProvider) throws Exception {
+    public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
+                           BookKeeperClientFactory bookkeeperProvider) throws Exception {
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
         managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
         managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -125,12 +125,18 @@ public class ManagedLedgerClientFactory implements Closeable {
     @Override
     public void close() throws IOException {
         try {
-            managedLedgerFactory.shutdown();
-            log.info("Closed managed ledger factory");
+            if (null != managedLedgerFactory) {
+                managedLedgerFactory.shutdown();
+                log.info("Closed managed ledger factory");
+            }
 
-            statsProvider.stop();
+            if (null != statsProvider) {
+                statsProvider.stop();
+            }
             try {
-                defaultBkClient.close();
+                if (null != defaultBkClient) {
+                    defaultBkClient.close();
+                }
             } catch (RejectedExecutionException ree) {
                 // when closing bookkeeper client, it will error outs all pending metadata operations.
                 // those callbacks of those operations will be triggered, and submitted to the scheduler
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aa893d3..919d38e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -92,6 +92,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
@@ -158,7 +159,7 @@ public class PulsarService implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
     private ServiceConfiguration config = null;
     private NamespaceService nsService = null;
-    private ManagedLedgerClientFactory managedLedgerClientFactory = null;
+    private ManagedLedgerStorage managedLedgerClientFactory = null;
     private LeaderElectionService leaderElectionService = null;
     private BrokerService brokerService = null;
     private WebService webService = null;
@@ -516,7 +517,9 @@ public class PulsarService implements AutoCloseable {
             this.startZkCacheService();
 
             this.bkClientFactory = newBookKeeperClientFactory();
-            managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
+            managedLedgerClientFactory = ManagedLedgerStorage.create(
+                config, getZkClient(), bkClientFactory
+            );
 
             this.brokerService = new BrokerService(this);
 
@@ -975,7 +978,7 @@ public class PulsarService implements AutoCloseable {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
 
-    public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
+    public ManagedLedgerStorage getManagedLedgerClientFactory() {
         return managedLedgerClientFactory;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
new file mode 100644
index 0000000..2b28517
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import java.io.IOException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.classification.InterfaceAudience.Private;
+import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Storage to access {@link org.apache.bookkeeper.mledger.ManagedLedger}s.
+ */
+@Private
+@Unstable
+public interface ManagedLedgerStorage extends AutoCloseable {
+
+    /**
+     * Initialize the managed ledger storage.
+     *
+     * @param conf service config
+     * @param zkClient zk client
+     * @param bookkeperProvider bookkeeper provider
+     * @throws Exception
+     */
+    void initialize(ServiceConfiguration conf,
+                    ZooKeeper zkClient,
+                    BookKeeperClientFactory bookkeperProvider) throws Exception;
+
+    /**
+     * Return the factory to create {@link ManagedLedgerFactory}.
+     *
+     * @return the factory to create {@link ManagedLedgerFactory}.
+     */
+    ManagedLedgerFactory getManagedLedgerFactory();
+
+    /**
+     * Return the stats provider to expose the stats of the storage implementation.
+     *
+     * @return the stats provider.
+     */
+    StatsProvider getStatsProvider();
+
+    /**
+     * Return the default bookkeeper client.
+     *
+     * @return the default bookkeeper client.
+     */
+    BookKeeper getBookKeeperClient();
+
+    /**
+     * Close the storage.
+     *
+     * @throws IOException
+     */
+    void close() throws IOException;
+
+    /**
+     * Initialize the {@link ManagedLedgerStorage} from the provided resources.
+     *
+     * @param conf service config
+     * @param zkClient zookeeper client
+     * @param bkProvider bookkeeper client provider
+     * @return the initialized managed ledger storage.
+     */
+    static ManagedLedgerStorage create(ServiceConfiguration conf,
+                                       ZooKeeper zkClient,
+                                       BookKeeperClientFactory bkProvider) throws Exception {
+        final Class<?> storageClass = Class.forName(conf.getManagedLedgerStorageClassName());
+        final ManagedLedgerStorage storage = (ManagedLedgerStorage) storageClass.newInstance();
+        storage.initialize(conf, zkClient, bkProvider);
+        return storage;
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java
new file mode 100644
index 0000000..1e5ad0c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The storage layer for Apache Pulsar.
+ */
+package org.apache.pulsar.broker.storage;
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 9662f3e..4f5c886 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -231,7 +231,8 @@ public class BrokerBookieIsolationTest {
         // validate ledgers' ensemble with affinity bookies
         assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
 
-        ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+        ManagedLedgerClientFactory mlFactory =
+            (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
         Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
                 .getBkEnsemblePolicyToBookKeeperMap();
 
@@ -364,7 +365,8 @@ public class BrokerBookieIsolationTest {
         // validate ledgers' ensemble with affinity bookies
         assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
 
-        ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+        ManagedLedgerClientFactory mlFactory =
+            (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
         Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
                 .getBkEnsemblePolicyToBookKeeperMap();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index b604505..62a4ff0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -54,8 +54,8 @@ public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests {
 
         // Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
         // broker to fail subsequent send operation and it will trigger a recover
-        ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(),
-                pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
+        ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
+        clientFactory.initialize(pulsar.getConfiguration(), pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
         ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
         ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
         ml.close();