You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/14 01:28:52 UTC
[pulsar] branch master updated: [Issue 4480] Support setting zk
chroot path when initialize cluster metadata (#4502)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e19e5ba [Issue 4480] Support setting zk chroot path when initialize cluster metadata (#4502)
e19e5ba is described below
commit e19e5ba2f059383c7ac2b4ffca75e251b94f5b1e
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Fri Jun 14 09:28:47 2019 +0800
[Issue 4480] Support setting zk chroot path when initialize cluster metadata (#4502)
Fixes #4480
Modifications
Create the zookeeper chroot path at first if it doesn't exist.
* Support setting zk chroot path when initialize cluster metadata.
* Add a unit test for initZk
---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 36 +++++++++++++++++-----
.../zookeeper/ZooKeeperClientAspectJTest.java | 26 ++++++++++++++++
2 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index deff5e0..49af097 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -31,7 +31,6 @@ import java.util.List;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
@@ -93,7 +92,7 @@ public class PulsarClusterMetadataSetup {
private String globalZookeeper;
@Parameter(names = { "-cs",
- "--configuration-store" }, description = "Configuration Store connection string", required = false)
+ "--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;
@Parameter(names = {
@@ -136,13 +135,11 @@ public class PulsarClusterMetadataSetup {
arguments.configurationStore = arguments.globalZookeeper;
}
- log.info("Setting up cluster {} with zk={} configuration-store ={}", arguments.cluster, arguments.zookeeper,
+ log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
arguments.configurationStore);
- ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
- ZooKeeper localZk = zkfactory.create(
- arguments.zookeeper, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get();
- ZooKeeper configStoreZk = zkfactory.create(
- arguments.configurationStore, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get();
+
+ ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+ ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
// Format BookKeeper ledger storage metadata
ServerConfiguration bkConf = new ServerConfiguration();
@@ -273,6 +270,29 @@ public class PulsarClusterMetadataSetup {
log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}
+ public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
+ ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
+ int chrootIndex = connection.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = connection.substring(chrootIndex);
+ String zkConnectForChrootCreation = connection.substring(0, chrootIndex);
+ ZooKeeper chrootZk = zkfactory.create(
+ zkConnectForChrootCreation, SessionType.ReadWrite, sessionTimeout).get();
+ if (chrootZk.exists(chrootPath, false) == null) {
+ try {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (NodeExistsException e) {
+ // Ignore
+ }
+ log.info("Created zookeeper chroot path {} successfully", chrootPath);
+ }
+ chrootZk.close();
+ }
+ ZooKeeper zkConnect = zkfactory.create(connection, SessionType.ReadWrite, sessionTimeout).get();
+ return zkConnect;
+ }
+
private static BundlesData getBundles(int numBundles) {
Long maxVal = ((long) 1) << 32;
Long segSize = maxVal / numBundles;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
index 99ac805..d0927e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -46,6 +47,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
@@ -97,6 +99,30 @@ public class ZooKeeperClientAspectJTest {
}
}
+ @Test
+ public void testInitZk() throws Exception {
+ try {
+ ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
+ CompletableFuture<ZooKeeper> zkFuture = zkfactory.create("127.0.0.1:" + LOCAL_ZOOKEEPER_PORT, SessionType.ReadWrite,
+ (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+ localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ assertTrue(localZkc.getState().isConnected());
+ assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
+
+ String connection = "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT + "/prefix";
+ ZooKeeper chrootZkc = PulsarClusterMetadataSetup.initZk(connection, (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+ assertTrue(chrootZkc.getState().isConnected());
+ assertNotEquals(chrootZkc.getState(), States.CONNECTEDREADONLY);
+ chrootZkc.close();
+
+ assertTrue(localZkc.exists("/prefix", false) != null);
+ } finally {
+ if (localZkc != null) {
+ localZkc.close();
+ }
+ }
+ }
+
@BeforeMethod
void setup() throws Exception {
localZkS = new ZookeeperServerTest(LOCAL_ZOOKEEPER_PORT);