You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/18 04:57:48 UTC

[pulsar] branch master updated: Fix failed initialized METADATA_STORE_SCHEME (#14708)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a38fce6 Fix failed initialized METADATA_STORE_SCHEME (#14708)
0a38fce6 is described below

commit 0a38fce64f9caba68eb9d7e6828f9833815f5adc
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Fri Mar 18 12:56:01 2022 +0800

    Fix failed initialized METADATA_STORE_SCHEME (#14708)
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
 .../rackawareness/BookieRackAffinityMapping.java   | 66 +++++++++++++++++-----
 .../IsolatedBookieEnsemblePlacementPolicy.java     | 20 +++----
 .../bookkeeper/AbstractMetadataDriver.java         |  2 +-
 3 files changed, 59 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index c302a01..ec4b7da 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,6 +32,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackChangeNotifier;
+import org.apache.bookkeeper.meta.exceptions.Code;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieNode;
@@ -42,7 +45,10 @@ import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,24 +69,54 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
     private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
 
-    @Override
-    public void setConf(Configuration conf) {
-        super.setConf(conf);
+    public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
+        MetadataStore store;
         Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
-        if (storeProperty == null) {
-            throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client "
-                    + "configuration");
-        }
-
-        if (!(storeProperty instanceof MetadataStore)) {
-            throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+        if (storeProperty != null) {
+            if (!(storeProperty instanceof MetadataStore)) {
+                throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+            }
+            store = (MetadataStore) storeProperty;
+        } else {
+            String url;
+            String metadataServiceUri = (String) conf.getProperty("metadataServiceUri");
+            if (StringUtils.isNotBlank(metadataServiceUri)) {
+                try {
+                    url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
+                            .replace(";", ",");
+                } catch (Exception e) {
+                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
+                }
+            } else {
+                String zkServers = (String) conf.getProperty("zkServers");
+                if (StringUtils.isBlank(zkServers)) {
+                    String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor "
+                            + "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
+                    throw new RuntimeException(errorMsg);
+                }
+                url = zkServers;
+            }
+            try {
+                int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
+                store = MetadataStoreExtended.create(url,
+                        MetadataStoreConfig.builder()
+                                .sessionTimeoutMillis(zkTimeout)
+                                .build());
+            } catch (MetadataStoreException e) {
+                throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
+            }
         }
+        return store;
+    }
 
-        MetadataStore store = (MetadataStore) storeProperty;
-
-        bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
-        bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        MetadataStore store;
         try {
+            store = createMetadataStore(conf);
+            bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
+            bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
             for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
                     .map(Map::values).orElse(Collections.emptyList())) {
                 for (String address : bookieMapping.keySet()) {
@@ -91,7 +127,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                             bookieAddressListLastTime);
                 }
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | MetadataException e) {
             throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
         }
         store.registerListener(this::handleUpdates);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index fed8943..c086e2c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.bookie.rackawareness;
 
+import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
 import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -68,20 +70,12 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
     public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
             Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
             StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
-
-        Object storeProperty = conf.getProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE);
-        if (storeProperty == null) {
-            throw new RuntimeException(BookieRackAffinityMapping.METADATA_STORE_INSTANCE
-                    + " configuration was not set in the BK client configuration");
-        }
-
-        if (!(storeProperty instanceof MetadataStore)) {
-            throw new RuntimeException(
-                    BookieRackAffinityMapping.METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
+        MetadataStore store;
+        try {
+            store = BookieRackAffinityMapping.createMetadataStore(conf);
+        } catch (MetadataException e) {
+            throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
         }
-
-        MetadataStore store = (MetadataStore) storeProperty;
-
         Set<String> primaryIsolationGroups = new HashSet<>();
         Set<String> secondaryIsolationGroups = new HashSet<>();
         if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index dceeb21..1a088e9 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 public abstract class AbstractMetadataDriver implements Closeable {
 
-    protected static final String METADATA_STORE_SCHEME = "metadata-store";
+    public static final String METADATA_STORE_SCHEME = "metadata-store";
 
     public static final String METADATA_STORE_INSTANCE = "metadata-store-instance";