You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 20:18:49 UTC

[incubator-pulsar] branch master updated: Don't initialize /managed-ledgers on client creation (#2379) (#2509)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8763c74  Don't initialize /managed-ledgers on client creation (#2379) (#2509)
8763c74 is described below

commit 8763c746b5b38901d7fbb77e5766f55c563ba25e
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Sep 4 22:18:46 2018 +0200

    Don't initialize /managed-ledgers on client creation (#2379) (#2509)
    
    Normally the /managed-ledgers znode is created by the
    initialize-cluster-metadata command when a cluster is being turned
    up.
    
    However, the ManagedLedger client also creates it on boot. This has
    caused issues in the past, where if a broker is started before
    initialize-cluster-metadata is run, then initialize-cluster-metadata
    fails because it sees the /managed-ledger znode.
    
    This patch removes the automatic creation of this znode from the
    client boot process.
---
 .../mledger/impl/MetaStoreImplZookeeper.java       | 64 +++++++++++++++++++---
 .../mledger/impl/MetaStoreImplZookeeperTest.java   | 52 ++++++++++++++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  4 ++
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |  4 ++
 4 files changed, 117 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 354c04f..4d90b91 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -25,9 +25,11 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
 import com.google.protobuf.TextFormat.ParseException;
 
+import java.io.File;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -35,7 +37,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -95,10 +96,6 @@ public class MetaStoreImplZookeeper implements MetaStore {
             throws Exception {
         this.zk = zk;
         this.executor = executor;
-
-        if (zk.exists(prefixName, false) == null) {
-            zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT);
-        }
     }
 
     //
@@ -157,8 +154,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
                                 }
                             };
 
-                            ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl,
-                                    CreateMode.PERSISTENT, createcb, null);
+                            asyncCreateFullPathOptimistic(zk, prefixName, ledgerName, new byte[0], Acl,
+                                                          CreateMode.PERSISTENT, createcb);
                         } else {
                             // Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
                             // point
@@ -365,5 +362,58 @@ public class MetaStoreImplZookeeper implements MetaStore {
 
     }
 
+    public static void asyncCreateFullPathOptimistic(
+            final ZooKeeper zk, final String basePath, final String nodePath, final byte[] data,
+            final List<ACL> acl, final CreateMode createMode, final StringCallback callback) {
+        String fullPath = basePath + "/" + nodePath;
+
+        zk.create(fullPath, data, acl, createMode,
+                  (rc, path, ignoreCtx1, name) -> {
+                      Runnable retry = () -> {
+                          asyncCreateFullPathOptimistic(zk, basePath, nodePath, data,
+                                                        acl, createMode, callback);
+                      };
+
+                      Consumer<Integer> complete = (finalrc) -> {
+                          callback.processResult(finalrc, path, null, name);
+                      };
+
+                      if (rc != Code.NONODE.intValue()) {
+                          complete.accept(rc);
+                          return;
+                      }
+
+                      // Since I got a nonode, it means that my parents don't exist
+                      // create mode is persistent since ephemeral nodes can't be
+                      // parents
+                      String nodeParent = new File(nodePath).getParent();
+                      if (nodeParent == null) {
+                          zk.exists(basePath, false,
+                                    (existsRc, existsPath, ignoreCtx2, stat) -> {
+                                        if (existsRc == Code.OK.intValue()) {
+                                            if (stat != null) {
+                                                retry.run();
+                                            } else {
+                                                complete.accept(Code.NONODE.intValue());
+                                            }
+                                        } else {
+                                            complete.accept(existsRc);
+                                        }
+                                    }, null);
+                      } else {
+                          nodeParent = nodeParent.replace("\\", "/");
+                          asyncCreateFullPathOptimistic(
+                                  zk, basePath, nodeParent, new byte[0], acl, CreateMode.PERSISTENT,
+                                  (parentRc, parentPath, ignoreCtx3, parentName) -> {
+                                      if (parentRc == Code.OK.intValue() || parentRc == Code.NODEEXISTS.intValue()) {
+                                          retry.run();
+                                      } else {
+                                          complete.accept(parentRc);
+                                      }
+                                  });
+                      }
+                  }, null);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
index 17b9962..5f2b89a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -31,6 +34,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs;
 import org.testng.annotations.Test;
@@ -214,4 +218,52 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
 
         latch.await();
     }
+
+    @Test(timeOut = 20000)
+    public void createOptimisticBaseNotExist() throws Exception {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+                zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+                (rc, path, ctx, name) -> {
+                    if (rc != KeeperException.Code.OK.intValue()) {
+                        promise.completeExceptionally(KeeperException.create(rc));
+                    } else {
+                        promise.complete(null);
+                    }
+                });
+        try {
+            promise.get();
+            fail("should have failed");
+        } catch (ExecutionException ee) {
+            assertEquals(ee.getCause().getClass(), KeeperException.NoNodeException.class);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void createOptimisticBaseExists() throws Exception {
+        zkc.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+                zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+                (rc, path, ctx, name) -> {
+                    if (rc != KeeperException.Code.OK.intValue()) {
+                        promise.completeExceptionally(KeeperException.create(rc));
+                    } else {
+                        promise.complete(null);
+                    }
+                });
+        promise.get();
+
+        CompletableFuture<Void> promise2 = new CompletableFuture<>();
+        MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+                zkc, "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+                (rc, path, ctx, name) -> {
+                    if (rc != KeeperException.Code.OK.intValue()) {
+                        promise2.completeExceptionally(KeeperException.create(rc));
+                    } else {
+                        promise2.complete(null);
+                    }
+                });
+        promise2.get();
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 1680f71..7f90042 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -47,7 +47,9 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,6 +99,8 @@ public abstract class BookKeeperClusterTestCase {
             startZKCluster();
             // start bookkeeper service
             startBKCluster();
+
+            zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch (Exception e) {
             LOG.error("Error setting up", e);
             throw e;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index e3c18a0..251a4c8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -28,7 +28,9 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -79,6 +81,8 @@ public abstract class MockedBookKeeperTestCase {
 
         ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
         factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+        zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     @AfterMethod