You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/05 10:58:59 UTC
[pulsar] branch master updated: [ML] Make ManagedLedger storage
configurable (#9397)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ddc3813 [ML] Make ManagedLedger storage configurable (#9397)
ddc3813 is described below
commit ddc381363ec175d86c7b8086ecc81cfeda43438e
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Feb 5 02:57:35 2021 -0800
[ML] Make ManagedLedger storage configurable (#9397)
*Motivation*
This is the first step to allow supporting different storage implementations for Pulsar
---
.../apache/pulsar/broker/ServiceConfiguration.java | 5 ++
.../pulsar/broker/ManagedLedgerClientFactory.java | 26 +++---
.../org/apache/pulsar/broker/PulsarService.java | 9 +-
.../broker/storage/ManagedLedgerStorage.java | 95 ++++++++++++++++++++++
.../apache/pulsar/broker/storage/package-info.java | 22 +++++
.../broker/service/BrokerBookieIsolationTest.java | 6 +-
.../client/impl/SequenceIdWithErrorTest.java | 4 +-
7 files changed, 150 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 44381bd..34746f6 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1349,6 +1349,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int defaultNumPartitions = 1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
+ doc = "The class of the managed ledger storage"
+ )
+ private String managedLedgerStorageClassName = "org.apache.pulsar.broker.ManagedLedgerClientFactory";
+ @FieldContext(
+ category = CATEGORY_STORAGE_ML,
doc = "Number of threads to be used for managed ledger tasks dispatching"
)
private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index c86b7de..f174433 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
@@ -36,23 +35,24 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ManagedLedgerClientFactory implements Closeable {
+public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
- private final ManagedLedgerFactory managedLedgerFactory;
- private final BookKeeper defaultBkClient;
+ private ManagedLedgerFactory managedLedgerFactory;
+ private BookKeeper defaultBkClient;
private final Map<EnsemblePlacementPolicyConfig, BookKeeper>
bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
private StatsProvider statsProvider = new NullStatsProvider();
- public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
- BookKeeperClientFactory bookkeeperProvider) throws Exception {
+ public void initialize(ServiceConfiguration conf, ZooKeeper zkClient,
+ BookKeeperClientFactory bookkeeperProvider) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -125,12 +125,18 @@ public class ManagedLedgerClientFactory implements Closeable {
@Override
public void close() throws IOException {
try {
- managedLedgerFactory.shutdown();
- log.info("Closed managed ledger factory");
+ if (null != managedLedgerFactory) {
+ managedLedgerFactory.shutdown();
+ log.info("Closed managed ledger factory");
+ }
- statsProvider.stop();
+ if (null != statsProvider) {
+ statsProvider.stop();
+ }
try {
- defaultBkClient.close();
+ if (null != defaultBkClient) {
+ defaultBkClient.close();
+ }
} catch (RejectedExecutionException ree) {
// when closing bookkeeper client, it will error outs all pending metadata operations.
// those callbacks of those operations will be triggered, and submitted to the scheduler
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aa893d3..919d38e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -92,6 +92,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
@@ -158,7 +159,7 @@ public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private ServiceConfiguration config = null;
private NamespaceService nsService = null;
- private ManagedLedgerClientFactory managedLedgerClientFactory = null;
+ private ManagedLedgerStorage managedLedgerClientFactory = null;
private LeaderElectionService leaderElectionService = null;
private BrokerService brokerService = null;
private WebService webService = null;
@@ -516,7 +517,9 @@ public class PulsarService implements AutoCloseable {
this.startZkCacheService();
this.bkClientFactory = newBookKeeperClientFactory();
- managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);
+ managedLedgerClientFactory = ManagedLedgerStorage.create(
+ config, getZkClient(), bkClientFactory
+ );
this.brokerService = new BrokerService(this);
@@ -975,7 +978,7 @@ public class PulsarService implements AutoCloseable {
return managedLedgerClientFactory.getManagedLedgerFactory();
}
- public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
+ public ManagedLedgerStorage getManagedLedgerClientFactory() {
return managedLedgerClientFactory;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
new file mode 100644
index 0000000..2b28517
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.storage;
+
+import java.io.IOException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.classification.InterfaceAudience.Private;
+import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Storage to access {@link org.apache.bookkeeper.mledger.ManagedLedger}s.
+ */
+@Private
+@Unstable
+public interface ManagedLedgerStorage extends AutoCloseable {
+
+ /**
+ * Initialize the managed ledger storage.
+ *
+ * @param conf service config
+ * @param zkClient zk client
+ * @param bookkeperProvider bookkeeper provider
+ * @throws Exception
+ */
+ void initialize(ServiceConfiguration conf,
+ ZooKeeper zkClient,
+ BookKeeperClientFactory bookkeperProvider) throws Exception;
+
+ /**
+ * Return the factory to create {@link ManagedLedgerFactory}.
+ *
+ * @return the factory to create {@link ManagedLedgerFactory}.
+ */
+ ManagedLedgerFactory getManagedLedgerFactory();
+
+ /**
+ * Return the stats provider to expose the stats of the storage implementation.
+ *
+ * @return the stats provider.
+ */
+ StatsProvider getStatsProvider();
+
+ /**
+ * Return the default bookkeeper client.
+ *
+ * @return the default bookkeeper client.
+ */
+ BookKeeper getBookKeeperClient();
+
+ /**
+ * Close the storage.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * Initialize the {@link ManagedLedgerStorage} from the provided resources.
+ *
+ * @param conf service config
+ * @param zkClient zookeeper client
+ * @param bkProvider bookkeeper client provider
+ * @return the initialized managed ledger storage.
+ */
+ static ManagedLedgerStorage create(ServiceConfiguration conf,
+ ZooKeeper zkClient,
+ BookKeeperClientFactory bkProvider) throws Exception {
+ final Class<?> storageClass = Class.forName(conf.getManagedLedgerStorageClassName());
+ final ManagedLedgerStorage storage = (ManagedLedgerStorage) storageClass.newInstance();
+ storage.initialize(conf, zkClient, bkProvider);
+ return storage;
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java
new file mode 100644
index 0000000..1e5ad0c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The storage layer for Apache Pulsar.
+ */
+package org.apache.pulsar.broker.storage;
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 9662f3e..4f5c886 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -231,7 +231,8 @@ public class BrokerBookieIsolationTest {
// validate ledgers' ensemble with affinity bookies
assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
- ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+ ManagedLedgerClientFactory mlFactory =
+ (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();
@@ -364,7 +365,8 @@ public class BrokerBookieIsolationTest {
// validate ledgers' ensemble with affinity bookies
assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
- ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+ ManagedLedgerClientFactory mlFactory =
+ (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory();
Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
.getBkEnsemblePolicyToBookKeeperMap();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index b604505..62a4ff0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -54,8 +54,8 @@ public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests {
// Fence the topic by opening the ManagedLedger for the topic outside the Pulsar broker. This will cause the
// broker to fail subsequent send operation and it will trigger a recover
- ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory(pulsar.getConfiguration(),
- pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
+ ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
+ clientFactory.initialize(pulsar.getConfiguration(), pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();