You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/13 05:35:47 UTC

[pulsar] branch branch-2.10 updated: Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 1df5734a389 Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""
1df5734a389 is described below

commit 1df5734a3898d371a9d75d921ecb6d73b2847a71
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 13 13:24:54 2022 +0800

    Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""
    
    This reverts commit 7b75cf62f1904bfcd9ea6243741105905dd6769f.
---
 conf/standalone.conf                               | 20 +++++++++++++----
 .../org/apache/pulsar/PulsarStandaloneBuilder.java |  7 ++++--
 .../org/apache/pulsar/PulsarStandaloneStarter.java |  8 +++++--
 .../common/conf/InternalConfigurationData.java     | 26 +++++++++++-----------
 .../functions/worker/PulsarWorkerService.java      | 13 ++++++-----
 .../pulsar/functions/worker/WorkerUtils.java       |  3 ++-
 .../bookkeeper/BookKeeperPackagesStorage.java      | 13 ++++++++---
 .../BookKeeperPackagesStorageConfiguration.java    |  4 ++++
 .../bookkeeper/BookKeeperPackagesStorageTest.java  |  8 +++----
 9 files changed, 68 insertions(+), 34 deletions(-)

diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5f90ea1ce3f..8eb25a2aa65 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -19,11 +19,15 @@
 
 ### --- General broker settings --- ###
 
-# Zookeeper quorum connection string
-zookeeperServers=
+# The metadata store URL
+# Examples:
+# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
+# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
+# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
+metadataStoreUrl=
 
-# Configuration Store connection string
-configurationStoreServers=
+# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
+configurationMetadataStoreUrl=
 
 brokerServicePort=6650
 
@@ -1087,3 +1091,11 @@ zooKeeperOperationTimeoutSeconds=-1
 # ZooKeeper cache expiry time in seconds
 # Deprecated: use metadataStoreCacheExpirySeconds
 zooKeeperCacheExpirySeconds=-1
+
+# Zookeeper quorum connection string
+# Deprecated: use metadataStoreUrl
+zookeeperServers=
+
+# Configuration Store connection string
+# Deprecated: use configurationMetadataStoreUrl
+configurationStoreServers=
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
index 581db63bb0e..70d457dbb46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
@@ -21,6 +21,7 @@ package org.apache.pulsar;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 
 public final class PulsarStandaloneBuilder {
 
@@ -114,8 +115,10 @@ public final class PulsarStandaloneBuilder {
         }
 
         // Set ZK server's host to localhost
-        pulsarStandalone.getConfig().setZookeeperServers(zkServers + ":" + pulsarStandalone.getZkPort());
-        pulsarStandalone.getConfig().setConfigurationStoreServers(zkServers + ":" + pulsarStandalone.getZkPort());
+        final String metadataStoreUrl =
+                ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
+        pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
+        pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
         pulsarStandalone.getConfig().setRunningStandalone(true);
         return pulsarStandalone;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 92b3e3e64ee..b8ec534b893 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -28,6 +28,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,8 +101,11 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
                 }
             }
         }
-        config.setZookeeperServers(zkServers + ":" + this.getZkPort());
-        config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());
+
+        final String metadataStoreUrl =
+                ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
+        config.setMetadataStoreUrl(metadataStoreUrl);
+        config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
 
         config.setRunningStandalone(true);
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 80da3caa4ba..2927aad8afe 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -27,8 +27,8 @@ import lombok.ToString;
 @ToString
 public class InternalConfigurationData {
 
-    private String zookeeperServers;
-    private String configurationStoreServers;
+    private String metadataStoreUrl;
+    private String configurationMetadataStoreUrl;
     @Deprecated
     private String ledgersRootPath;
     private String bookkeeperMetadataServiceUri;
@@ -38,23 +38,23 @@ public class InternalConfigurationData {
     }
 
     public InternalConfigurationData(String zookeeperServers,
-                                     String configurationStoreServers,
+                                     String configurationMetadataStoreUrl,
                                      String ledgersRootPath,
                                      String bookkeeperMetadataServiceUri,
                                      String stateStorageServiceUrl) {
-        this.zookeeperServers = zookeeperServers;
-        this.configurationStoreServers = configurationStoreServers;
+        this.metadataStoreUrl = zookeeperServers;
+        this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
         this.ledgersRootPath = ledgersRootPath;
         this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
     }
 
-    public String getZookeeperServers() {
-        return zookeeperServers;
+    public String getMetadataStoreUrl() {
+        return metadataStoreUrl;
     }
 
-    public String getConfigurationStoreServers() {
-        return configurationStoreServers;
+    public String getConfigurationMetadataStoreUrl() {
+        return configurationMetadataStoreUrl;
     }
 
     /** @deprecated */
@@ -77,8 +77,8 @@ public class InternalConfigurationData {
             return false;
         }
         InternalConfigurationData other = (InternalConfigurationData) obj;
-        return Objects.equals(zookeeperServers, other.zookeeperServers)
-            && Objects.equals(configurationStoreServers, other.configurationStoreServers)
+        return Objects.equals(metadataStoreUrl, other.metadataStoreUrl)
+            && Objects.equals(configurationMetadataStoreUrl, other.configurationMetadataStoreUrl)
             && Objects.equals(ledgersRootPath, other.ledgersRootPath)
             && Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
             && Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
@@ -86,8 +86,8 @@ public class InternalConfigurationData {
 
     @Override
     public int hashCode() {
-        return Objects.hash(zookeeperServers,
-                configurationStoreServers,
+        return Objects.hash(metadataStoreUrl,
+                configurationMetadataStoreUrl,
                 ledgersRootPath,
                 bookkeeperMetadataServiceUri,
                 stateStorageServiceUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 6dbdb592eaf..10710641b0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
+import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -274,13 +275,14 @@ public class PulsarWorkerService implements WorkerService {
         URI dlogURI;
         try {
             if (workerConfig.isInitializedDlogMetadata()) {
-                dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+                String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
+                dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
             } else {
                 dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
             }
         } catch (IOException ioe) {
             log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing "
-                            + "function packages", internalConf.getZookeeperServers(),
+                            + "function packages", internalConf.getMetadataStoreUrl(),
                     internalConf.getBookkeeperMetadataServiceUri(), ioe);
             throw ioe;
         }
@@ -356,15 +358,16 @@ public class PulsarWorkerService implements WorkerService {
         URI dlogURI;
         try {
             // initializing dlog namespace for function worker
-            if (workerConfig.isInitializedDlogMetadata()){
-                dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+            if (workerConfig.isInitializedDlogMetadata()) {
+                String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
+                dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
             } else {
                 dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
             }
         } catch (IOException ioe) {
             LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for "
                             + "storing function packages",
-                    internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
+                    internalConf.getMetadataStoreUrl(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
             throw ioe;
         }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 828b4e16516..5de762a1d7c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
 
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -184,7 +185,7 @@ public final class WorkerUtils {
         // for BC purposes
         if (internalConf.getBookkeeperMetadataServiceUri() == null) {
             ledgersRootPath = internalConf.getLedgersRootPath();
-            ledgersStoreServers = internalConf.getZookeeperServers();
+            ledgersStoreServers = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
             chrootPath = "";
         } else {
             URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index 1571fd504ad..dbb29e416d7 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException;
 public class BookKeeperPackagesStorage implements PackagesStorage {
 
     private static final String NS_CLIENT_ID = "packages-management";
+    public static final String ZK_SCHEME_IDENTIFIER = "zk:";
     final BookKeeperPackagesStorageConfiguration configuration;
     private Namespace namespace;
 
@@ -100,12 +101,18 @@ public class BookKeeperPackagesStorage implements PackagesStorage {
             ledgersRootPath = metadataServiceUri.getPath();
         } else {
             ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
-            ledgersStoreServers = configuration.getZookeeperServers();
+            if (StringUtils.isNotBlank(configuration.getMetadataStoreUrl())) {
+                ledgersStoreServers = configuration.getMetadataStoreUrl();
+                if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
+                    ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
+                }
+            } else {
+                ledgersStoreServers = configuration.getZookeeperServers();
+            }
         }
         BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
         DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
-        URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages",
-            configuration.getZookeeperServers()));
+        URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
         try {
             dlMetadata.create(dlogURI);
         } catch (ZKException e) {
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
index ce6acecdd51..1d6758b248a 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -42,6 +42,10 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo
         return getProperty("zookeeperServers");
     }
 
+    String getMetadataStoreUrl() {
+        return getProperty("metadataStoreUrl");
+    }
+
     String getPackagesManagementLedgerRootPath() {
         return getProperty("packagesManagementLedgerRootPath");
     }
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 69312076410..90458f96a24 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -50,7 +50,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
         PackagesStorageProvider provider = PackagesStorageProvider
             .newProvider(BookKeeperPackagesStorageProvider.class.getName());
         DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
-        configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
+        configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
         configuration.setProperty("packagesReplicas", "1");
         configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
         storage = provider.getStorage(configuration);
@@ -68,7 +68,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
     public void testConfiguration() {
         assertTrue(storage instanceof BookKeeperPackagesStorage);
         BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
-        assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString());
+        assertEquals(bkStorage.configuration.getMetadataStoreUrl(), zkUtil.getZooKeeperConnectString());
         assertEquals(bkStorage.configuration.getPackagesReplicas(), 1);
         assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers");
     }
@@ -198,7 +198,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
                 .newProvider(BookKeeperPackagesStorageProvider.class.getName());
         DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
         // set the unavailable bk cluster with mock zookeeper path
-        configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString() + "/mock");
+        configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString() + "/mock");
         configuration.setProperty("packagesReplicas", "1");
         configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
         PackagesStorage storage1 = provider.getStorage(configuration);
@@ -221,7 +221,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
         // set the available bk cluster with bookkeeperMetadataServiceUri using actual zookeeper path
         String bookkeeperMetadataServiceUri = String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
         DefaultPackagesStorageConfiguration configuration2 = new DefaultPackagesStorageConfiguration();
-        configuration2.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
+        configuration2.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
         configuration2.setProperty("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri);
         configuration2.setProperty("packagesReplicas", "1");
         PackagesStorage storage2 = provider.getStorage(configuration2);