You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/13 02:58:48 UTC
[pulsar] branch branch-2.10 updated: Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)"
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 7b75cf62f19 Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)"
7b75cf62f19 is described below
commit 7b75cf62f1904bfcd9ea6243741105905dd6769f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Sep 13 10:58:30 2022 +0800
Revert "[Branch-2.10] [fix] [broker] Fix bookeeper packages npe (#17291)"
This reverts commit a7b277254aef7ed7c32f5c57482a10413617bc60 to fix cpp
tests.
---
conf/standalone.conf | 20 ++++-------------
.../org/apache/pulsar/PulsarStandaloneBuilder.java | 7 ++----
.../org/apache/pulsar/PulsarStandaloneStarter.java | 8 ++-----
.../common/conf/InternalConfigurationData.java | 26 +++++++++++-----------
.../functions/worker/PulsarWorkerService.java | 13 +++++------
.../pulsar/functions/worker/WorkerUtils.java | 3 +--
.../bookkeeper/BookKeeperPackagesStorage.java | 13 +++--------
.../BookKeeperPackagesStorageConfiguration.java | 4 ----
.../bookkeeper/BookKeeperPackagesStorageTest.java | 8 +++----
9 files changed, 34 insertions(+), 68 deletions(-)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 8eb25a2aa65..5f90ea1ce3f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -19,15 +19,11 @@
### --- General broker settings --- ###
-# The metadata store URL
-# Examples:
-# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
-# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
-# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
-metadataStoreUrl=
+# Zookeeper quorum connection string
+zookeeperServers=
-# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
-configurationMetadataStoreUrl=
+# Configuration Store connection string
+configurationStoreServers=
brokerServicePort=6650
@@ -1091,11 +1087,3 @@ zooKeeperOperationTimeoutSeconds=-1
# ZooKeeper cache expiry time in seconds
# Deprecated: use metadataStoreCacheExpirySeconds
zooKeeperCacheExpirySeconds=-1
-
-# Zookeeper quorum connection string
-# Deprecated: use metadataStoreUrl
-zookeeperServers=
-
-# Configuration Store connection string
-# Deprecated: use configurationMetadataStoreUrl
-configurationStoreServers=
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
index 70d457dbb46..581db63bb0e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneBuilder.java
@@ -21,7 +21,6 @@ package org.apache.pulsar;
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
public final class PulsarStandaloneBuilder {
@@ -115,10 +114,8 @@ public final class PulsarStandaloneBuilder {
}
// Set ZK server's host to localhost
- final String metadataStoreUrl =
- ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
- pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
- pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
+ pulsarStandalone.getConfig().setZookeeperServers(zkServers + ":" + pulsarStandalone.getZkPort());
+ pulsarStandalone.getConfig().setConfigurationStoreServers(zkServers + ":" + pulsarStandalone.getZkPort());
pulsarStandalone.getConfig().setRunningStandalone(true);
return pulsarStandalone;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index b8ec534b893..92b3e3e64ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -28,7 +28,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,11 +100,8 @@ public class PulsarStandaloneStarter extends PulsarStandalone {
}
}
}
-
- final String metadataStoreUrl =
- ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
- config.setMetadataStoreUrl(metadataStoreUrl);
- config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
+ config.setZookeeperServers(zkServers + ":" + this.getZkPort());
+ config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());
config.setRunningStandalone(true);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 2927aad8afe..80da3caa4ba 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -27,8 +27,8 @@ import lombok.ToString;
@ToString
public class InternalConfigurationData {
- private String metadataStoreUrl;
- private String configurationMetadataStoreUrl;
+ private String zookeeperServers;
+ private String configurationStoreServers;
@Deprecated
private String ledgersRootPath;
private String bookkeeperMetadataServiceUri;
@@ -38,23 +38,23 @@ public class InternalConfigurationData {
}
public InternalConfigurationData(String zookeeperServers,
- String configurationMetadataStoreUrl,
+ String configurationStoreServers,
String ledgersRootPath,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
- this.metadataStoreUrl = zookeeperServers;
- this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
+ this.zookeeperServers = zookeeperServers;
+ this.configurationStoreServers = configurationStoreServers;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}
- public String getMetadataStoreUrl() {
- return metadataStoreUrl;
+ public String getZookeeperServers() {
+ return zookeeperServers;
}
- public String getConfigurationMetadataStoreUrl() {
- return configurationMetadataStoreUrl;
+ public String getConfigurationStoreServers() {
+ return configurationStoreServers;
}
/** @deprecated */
@@ -77,8 +77,8 @@ public class InternalConfigurationData {
return false;
}
InternalConfigurationData other = (InternalConfigurationData) obj;
- return Objects.equals(metadataStoreUrl, other.metadataStoreUrl)
- && Objects.equals(configurationMetadataStoreUrl, other.configurationMetadataStoreUrl)
+ return Objects.equals(zookeeperServers, other.zookeeperServers)
+ && Objects.equals(configurationStoreServers, other.configurationStoreServers)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
@@ -86,8 +86,8 @@ public class InternalConfigurationData {
@Override
public int hashCode() {
- return Objects.hash(metadataStoreUrl,
- configurationMetadataStoreUrl,
+ return Objects.hash(zookeeperServers,
+ configurationStoreServers,
ledgersRootPath,
bookkeeperMetadataServiceUri,
stateStorageServiceUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 10710641b0d..6dbdb592eaf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
-import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -275,14 +274,13 @@ public class PulsarWorkerService implements WorkerService {
URI dlogURI;
try {
if (workerConfig.isInitializedDlogMetadata()) {
- String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
- dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
+ dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing "
- + "function packages", internalConf.getMetadataStoreUrl(),
+ + "function packages", internalConf.getZookeeperServers(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
@@ -358,16 +356,15 @@ public class PulsarWorkerService implements WorkerService {
URI dlogURI;
try {
// initializing dlog namespace for function worker
- if (workerConfig.isInitializedDlogMetadata()) {
- String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
- dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
+ if (workerConfig.isInitializedDlogMetadata()){
+ dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for "
+ "storing function packages",
- internalConf.getMetadataStoreUrl(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
+ internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 5de762a1d7c..828b4e16516 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.functions.worker;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -185,7 +184,7 @@ public final class WorkerUtils {
// for BC purposes
if (internalConf.getBookkeeperMetadataServiceUri() == null) {
ledgersRootPath = internalConf.getLedgersRootPath();
- ledgersStoreServers = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
+ ledgersStoreServers = internalConf.getZookeeperServers();
chrootPath = "";
} else {
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
index dbb29e416d7..1571fd504ad 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -50,7 +50,6 @@ import org.apache.zookeeper.KeeperException;
public class BookKeeperPackagesStorage implements PackagesStorage {
private static final String NS_CLIENT_ID = "packages-management";
- public static final String ZK_SCHEME_IDENTIFIER = "zk:";
final BookKeeperPackagesStorageConfiguration configuration;
private Namespace namespace;
@@ -101,18 +100,12 @@ public class BookKeeperPackagesStorage implements PackagesStorage {
ledgersRootPath = metadataServiceUri.getPath();
} else {
ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
- if (StringUtils.isNotBlank(configuration.getMetadataStoreUrl())) {
- ledgersStoreServers = configuration.getMetadataStoreUrl();
- if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
- ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
- }
- } else {
- ledgersStoreServers = configuration.getZookeeperServers();
- }
+ ledgersStoreServers = configuration.getZookeeperServers();
}
BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
- URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", ledgersStoreServers));
+ URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages",
+ configuration.getZookeeperServers()));
try {
dlMetadata.create(dlogURI);
} catch (ZKException e) {
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
index 1d6758b248a..ce6acecdd51 100644
--- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -42,10 +42,6 @@ public class BookKeeperPackagesStorageConfiguration implements PackagesStorageCo
return getProperty("zookeeperServers");
}
- String getMetadataStoreUrl() {
- return getProperty("metadataStoreUrl");
- }
-
String getPackagesManagementLedgerRootPath() {
return getProperty("packagesManagementLedgerRootPath");
}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
index 90458f96a24..69312076410 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -50,7 +50,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
PackagesStorageProvider provider = PackagesStorageProvider
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
- configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
+ configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
storage = provider.getStorage(configuration);
@@ -68,7 +68,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
public void testConfiguration() {
assertTrue(storage instanceof BookKeeperPackagesStorage);
BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
- assertEquals(bkStorage.configuration.getMetadataStoreUrl(), zkUtil.getZooKeeperConnectString());
+ assertEquals(bkStorage.configuration.getZookeeperServers(), zkUtil.getZooKeeperConnectString());
assertEquals(bkStorage.configuration.getPackagesReplicas(), 1);
assertEquals(bkStorage.configuration.getPackagesManagementLedgerRootPath(), "/ledgers");
}
@@ -198,7 +198,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
.newProvider(BookKeeperPackagesStorageProvider.class.getName());
DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
// set the unavailable bk cluster with mock zookeeper path
- configuration.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString() + "/mock");
+ configuration.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString() + "/mock");
configuration.setProperty("packagesReplicas", "1");
configuration.setProperty("packagesManagementLedgerRootPath", "/ledgers");
PackagesStorage storage1 = provider.getStorage(configuration);
@@ -221,7 +221,7 @@ public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
// set the available bk cluster with bookkeeperMetadataServiceUri using actual zookeeper path
String bookkeeperMetadataServiceUri = String.format("zk+null://%s/ledgers", zkUtil.getZooKeeperConnectString());
DefaultPackagesStorageConfiguration configuration2 = new DefaultPackagesStorageConfiguration();
- configuration2.setProperty("metadataStoreUrl", zkUtil.getZooKeeperConnectString());
+ configuration2.setProperty("zookeeperServers", zkUtil.getZooKeeperConnectString());
configuration2.setProperty("bookkeeperMetadataServiceUri", bookkeeperMetadataServiceUri);
configuration2.setProperty("packagesReplicas", "1");
PackagesStorage storage2 = provider.getStorage(configuration2);