You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/18 04:57:48 UTC
[pulsar] branch master updated: Fix failed initialized METADATA_STORE_SCHEME (#14708)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0a38fce6 Fix failed initialized METADATA_STORE_SCHEME (#14708)
0a38fce6 is described below
commit 0a38fce64f9caba68eb9d7e6828f9833815f5adc
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Fri Mar 18 12:56:01 2022 +0800
Fix failed initialized METADATA_STORE_SCHEME (#14708)
Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
.../rackawareness/BookieRackAffinityMapping.java | 66 +++++++++++++++++-----
.../IsolatedBookieEnsemblePlacementPolicy.java | 20 +++----
.../bookkeeper/AbstractMetadataDriver.java | 2 +-
3 files changed, 59 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index c302a01..ec4b7da 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,6 +32,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.meta.exceptions.Code;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
@@ -42,7 +45,10 @@ import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,24 +69,54 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
+ public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
+ MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
- if (storeProperty == null) {
- throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client "
- + "configuration");
- }
-
- if (!(storeProperty instanceof MetadataStore)) {
- throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+ if (storeProperty != null) {
+ if (!(storeProperty instanceof MetadataStore)) {
+ throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+ }
+ store = (MetadataStore) storeProperty;
+ } else {
+ String url;
+ String metadataServiceUri = (String) conf.getProperty("metadataServiceUri");
+ if (StringUtils.isNotBlank(metadataServiceUri)) {
+ try {
+ url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
+ .replace(";", ",");
+ } catch (Exception e) {
+ throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
+ }
+ } else {
+ String zkServers = (String) conf.getProperty("zkServers");
+ if (StringUtils.isBlank(zkServers)) {
+ String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor "
+ + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
+ throw new RuntimeException(errorMsg);
+ }
+ url = zkServers;
+ }
+ try {
+ int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
+ store = MetadataStoreExtended.create(url,
+ MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(zkTimeout)
+ .build());
+ } catch (MetadataStoreException e) {
+ throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
+ }
}
+ return store;
+ }
- MetadataStore store = (MetadataStore) storeProperty;
-
- bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
- bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ MetadataStore store;
try {
+ store = createMetadataStore(conf);
+ bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
+ bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.map(Map::values).orElse(Collections.emptyList())) {
for (String address : bookieMapping.keySet()) {
@@ -91,7 +127,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
bookieAddressListLastTime);
}
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
store.registerListener(this::handleUpdates);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index fed8943..c086e2c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.bookie.rackawareness;
+import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -68,20 +70,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
-
- Object storeProperty = conf.getProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE);
- if (storeProperty == null) {
- throw new RuntimeException(BookieRackAffinityMapping.METADATA_STORE_INSTANCE
- + " configuration was not set in the BK client configuration");
- }
-
- if (!(storeProperty instanceof MetadataStore)) {
- throw new RuntimeException(
- BookieRackAffinityMapping.METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+ MetadataStore store;
+ try {
+ store = BookieRackAffinityMapping.createMetadataStore(conf);
+ } catch (MetadataException e) {
+ throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
}
-
- MetadataStore store = (MetadataStore) storeProperty;
-
Set<String> primaryIsolationGroups = new HashSet<>();
Set<String> secondaryIsolationGroups = new HashSet<>();
if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index dceeb21..1a088e9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
public abstract class AbstractMetadataDriver implements Closeable {
- protected static final String METADATA_STORE_SCHEME = "metadata-store";
+ public static final String METADATA_STORE_SCHEME = "metadata-store";
public static final String METADATA_STORE_INSTANCE = "metadata-store-instance";