You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/13 05:35:47 UTC
[pulsar] branch branch-2.10 updated: Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 1df5734a389 Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""
1df5734a389 is described below
commit 1df5734a3898d371a9d75d921ecb6d73b2847a71
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 13 13:24:54 2022 +0800
Revert "Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)""
This reverts commit 7b75cf62f1904bfcd9ea6243741105905dd6769f.
---
conf/standalone.conf | 20 +++++++++++++----
.../org/apache/pulsar/PulsarStandaloneBuilder.java | 7 ++++--
.../org/apache/pulsar/PulsarStandaloneStarter.java | 8 +++++--
.../common/conf/InternalConfigurationData.java | 26 +++++++++++-----------
.../functions/worker/PulsarWorkerService.java | 13 ++++++-----
.../pulsar/functions/worker/WorkerUtils.java | 3 ++-
.../bookkeeper/BookKeeperPackagesStorage.java | 13 ++++++++---
.../BookKeeperPackagesStorageConfiguration.java | 4 ++++
.../bookkeeper/BookKeeperPackagesStorageTest.java | 8 +++----
9 files changed, 68 insertions(+), 34 deletions(-)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 5f90ea1ce3f..8eb25a2aa65 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -19,11 +19,15 @@
### --- General broker settings --- ###
-# Zookeeper quorum connection string
-zookeeperServers=
+# The metadata store URL
+# Examples:
+# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
+# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
+# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
+metadataStoreUrl=
-# Configuration Store connection string
-configurationStoreServers=
+# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
+configurationMetadataStoreUrl=
brokerServicePort=6650
@@ -1087,3 +1091,11 @@ zooKeeperOperationTimeoutSeconds=-1
# ZooKeeper cache expiry time in seconds
# Deprecated: use metadataStoreCacheExpirySeconds
zooKeeperCacheExpirySeconds=-1
+
+# Zookeeper quorum connection string
+# Deprecated: use metadataStoreUrl
+zookeeperServers=
+
+# Configuration Store connection string
+# Deprecated: use configurationMetadataStoreUrl
+configurationStoreServers=
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
index 581db63bb0e..70d457dbb46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
@@ -21,6 +21,7 @@ package org.apache.pulsar;
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
public final class PulsarStandaloneBuilder {
@@ -114,8 +115,10 @@ public final class PulsarStandaloneBuilder {
}
// Set ZK server's host to localhost
- pulsarStandalone.getConfig().setZookeeperServers(zkServers + ":" + pulsarStandalone.getZkPort());
- pulsarStandalone.getConfig().setConfigurationStoreServers(zkServers + ":" + pulsarStandalone.getZkPort());
+ final String metadataStoreUrl =
+ ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
+ pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
+ pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setRunningStandalone(true);
return pulsarStandalone;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 92b3e3e64ee..b8ec534b893 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -28,6 +28,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,8 +101,11 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
}
}
}
- config.setZookeeperServers(zkServers + ":" + this.getZkPort());
- config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());
+
+ final String metadataStoreUrl =
+ ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
+ config.setMetadataStoreUrl(metadataStoreUrl);
+ config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
config.setRunningStandalone(true);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 80da3caa4ba..2927aad8afe 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -27,8 +27,8 @@ import lombok.ToString;
@ToString
public class InternalConfigurationData {
- private String zookeeperServers;
- private String configurationStoreServers;
+ private String metadataStoreUrl;
+ private String configurationMetadataStoreUrl;
@Deprecated
private String ledgersRootPath;
private String bookkeeperMetadataServiceUri;
@@ -38,23 +38,23 @@ public class InternalConfigurationData {
}
public InternalConfigurationData(String zookeeperServers,
- String configurationStoreServers,
+ String configurationMetadataStoreUrl,
String ledgersRootPath,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
- this.zookeeperServers = zookeeperServers;
- this.configurationStoreServers = configurationStoreServers;
+ this.metadataStoreUrl = zookeeperServers;
+ this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}
- public String getZookeeperServers() {
- return zookeeperServers;
+ public String getMetadataStoreUrl() {
+ return metadataStoreUrl;
}
- public String getConfigurationStoreServers() {
- return configurationStoreServers;
+ public String getConfigurationMetadataStoreUrl() {
+ return configurationMetadataStoreUrl;
}
/** @deprecated */
@@ -77,8 +77,8 @@ public class InternalConfigurationData {
return false;
}
InternalConfigurationData other = (InternalConfigurationData) obj;
- return Objects.equals(zookeeperServers, other.zookeeperServers)
- && Objects.equals(configurationStoreServers, other.configurationStoreServers)
+ return Objects.equals(metadataStoreUrl, other.metadataStoreUrl)
+ && Objects.equals(configurationMetadataStoreUrl, other.configurationMetadataStoreUrl)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
@@ -86,8 +86,8 @@ public class InternalConfigurationData {
@Override
public int hashCode() {
- return Objects.hash(zookeeperServers,
- configurationStoreServers,
+ return Objects.hash(metadataStoreUrl,
+ configurationMetadataStoreUrl,
ledgersRootPath,
bookkeeperMetadataServiceUri,
stateStorageServiceUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 6dbdb592eaf..10710641b0d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
+import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -274,13 +275,14 @@ public class PulsarWorkerService implements WorkerService {
URI dlogURI;
try {
if (workerConfig.isInitializedDlogMetadata()) {
- dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+ String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
+ dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing "
- + "function packages", internalConf.getZookeeperServers(),
+ + "function packages", internalConf.getMetadataStoreUrl(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
@@ -356,15 +358,16 @@ public class PulsarWorkerService implements WorkerService {
URI dlogURI;
try {
// initializing dlog namespace for function worker
- if (workerConfig.isInitializedDlogMetadata()){
- dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
+ if (workerConfig.isInitializedDlogMetadata()) {
+ String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
+ dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for "
+ "storing function packages",
- internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
+ internalConf.getMetadataStoreUrl(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 828b4e16516..5de762a1d7c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -184,7 +185,7 @@ public final class WorkerUtils {
// for BC purposes
if (internalConf.getBookkeeperMetadataServiceUri() == null) {
ledgersRootPath = internalConf.getLedgersRootPath();
- ledgersStoreServers = internalConf.getZookeeperServers();
+ ledgersStoreServers = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
chrootPath = "";
} else {
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index 1571fd504ad..dbb29e416d7 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -50,6 +50,7 @@ import org.apache.zookeeper.KeeperException;
public class BookKeeperPackagesStorage implements PackagesStorage {
private static final String NS_CLIENT_ID = "packages-management";
+ public static final String ZK_SCHEME_IDENTIFIER = "zk:";
final BookKeeperPackagesStorageConfiguration configuration;
private Namespace namespace;
@@ -100,12 +101,18 @@ public class BookKeeperPackagesStorage implements PackagesStorage {
ledgersRootPath = metadataServiceUri.getPath();
} else {
ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
- ledgersStoreServers = configuration.getZookeeperServers();
+ if (StringUtils.isNotBlank(configuration.getMetadataStoreUrl())) {
+ ledgersStoreServers = configuration.getMetadataStoreUrl();
+ if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
+ ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
+ }
+ } else {
+ ledgersStoreServers = configuration.getZookeeperServers();
+ }
}
BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
- URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages",
- configuration.getZookeeperServers()));
+ URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
try {
dlMetadata.create(dlogURI);
} catch (ZKException e) {
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
index ce6acecdd51..1d6758b248a 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -42,6 +42,10 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo
return getProperty("zookeeperServers");
}
+ String getMetadataStoreUrl() {
+ return getProperty("metadataStoreUrl");
+ }
+
String getPackagesManagementLedgerRootPath() {
return getProperty("packagesManagementLedgerRootPath");
}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 69312076410..90458f96a24 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -50,7 +50,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
PackagesStorageProvider provider = PackagesStorageProvider
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
- configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
+ configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
storage = provider.getStorage(configuration);
@@ -68,7 +68,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
public void testConfiguration() {
assertTrue(storage instanceof BookKeeperPackagesStorage);
BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
- assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString());
+ assertEquals(bkStorage.configuration.getMetadataStoreUrl(), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getPackagesReplicas(), 1);
assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers");
}
@@ -198,7 +198,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
// set the unavailable bk cluster with mock zookeeper path
- configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString() + "/mock");
+ configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString() + "/mock");
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
PackagesStorage storage1 = provider.getStorage(configuration);
@@ -221,7 +221,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
// set the available bk cluster with bookkeeperMetadataServiceUri using actual zookeeper path
String bookkeeperMetadataServiceUri = String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
DefaultPackagesStorageConfiguration configuration2 = new DefaultPackagesStorageConfiguration();
- configuration2.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
+ configuration2.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
configuration2.setProperty("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri);
configuration2.setProperty("packagesReplicas", "1");
PackagesStorage storage2 = provider.getStorage(configuration2);