You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/14 01:28:52 UTC

[pulsar] branch master updated: [Issue 4480] Support setting zk chroot path when initialize cluster metadata (#4502)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e19e5ba  [Issue 4480] Support setting zk chroot path when initialize cluster metadata (#4502)
e19e5ba is described below

commit e19e5ba2f059383c7ac2b4ffca75e251b94f5b1e
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Fri Jun 14 09:28:47 2019 +0800

    [Issue 4480] Support setting zk chroot path when initialize cluster metadata (#4502)
    
    Fixes #4480
    Modifications
    Create the zookeeper chroot path at first if it doesn't exist.
    
    * Support setting zk chroot path when initialize cluster metadata.
    
    * Add a unit test for initZk
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 36 +++++++++++++++++-----
 .../zookeeper/ZooKeeperClientAspectJTest.java      | 26 ++++++++++++++++
 2 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index deff5e0..49af097 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -93,7 +92,7 @@ public class PulsarClusterMetadataSetup {
         private String globalZookeeper;
 
         @Parameter(names = { "-cs",
-            "--configuration-store" }, description = "Configuration Store connection string", required = false)
+            "--configuration-store" }, description = "Configuration Store connection string", required = true)
         private String configurationStore;
 
         @Parameter(names = {
@@ -136,13 +135,11 @@ public class PulsarClusterMetadataSetup {
             arguments.configurationStore = arguments.globalZookeeper;
         }
 
-        log.info("Setting up cluster {} with zk={} configuration-store ={}", arguments.cluster, arguments.zookeeper,
+        log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
                 arguments.configurationStore);
-        ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
-        ZooKeeper localZk = zkfactory.create(
-            arguments.zookeeper, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get();
-        ZooKeeper configStoreZk = zkfactory.create(
-            arguments.configurationStore, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get();
+
+        ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+        ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
 
         // Format BookKeeper ledger storage metadata
         ServerConfiguration bkConf = new ServerConfiguration();
@@ -273,6 +270,29 @@ public class PulsarClusterMetadataSetup {
         log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
     }
 
+    public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
+        ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
+        int chrootIndex = connection.indexOf("/");
+        if (chrootIndex > 0) {
+            String chrootPath = connection.substring(chrootIndex);
+            String zkConnectForChrootCreation = connection.substring(0, chrootIndex);
+            ZooKeeper chrootZk = zkfactory.create(
+                zkConnectForChrootCreation, SessionType.ReadWrite, sessionTimeout).get();
+            if (chrootZk.exists(chrootPath, false) == null) {
+                try {
+                    ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                } catch (NodeExistsException e) {
+                    // Ignore
+                }
+                log.info("Created zookeeper chroot path {} successfully", chrootPath);
+            }
+            chrootZk.close();
+        }
+        ZooKeeper zkConnect = zkfactory.create(connection, SessionType.ReadWrite, sessionTimeout).get();
+        return zkConnect;
+    }
+
     private static BundlesData getBundles(int numBundles) {
         Long maxVal = ((long) 1) << 32;
         Long segSize = maxVal / numBundles;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
index 99ac805..d0927e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -46,6 +47,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
@@ -97,6 +99,30 @@ public class ZooKeeperClientAspectJTest {
         }
     }
 
+    @Test
+    public void testInitZk() throws Exception {
+        try {
+            ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
+            CompletableFuture<ZooKeeper> zkFuture = zkfactory.create("127.0.0.1:" + LOCAL_ZOOKEEPER_PORT, SessionType.ReadWrite,
+                (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+            localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            assertTrue(localZkc.getState().isConnected());
+            assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
+
+            String connection = "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT + "/prefix";
+            ZooKeeper chrootZkc = PulsarClusterMetadataSetup.initZk(connection, (int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
+            assertTrue(chrootZkc.getState().isConnected());
+            assertNotEquals(chrootZkc.getState(), States.CONNECTEDREADONLY);
+            chrootZkc.close();
+
+            assertTrue(localZkc.exists("/prefix", false) != null);
+        } finally {
+            if (localZkc != null) {
+                localZkc.close();
+            }
+        }
+    }
+
     @BeforeMethod
     void setup() throws Exception {
         localZkS = new ZookeeperServerTest(LOCAL_ZOOKEEPER_PORT);