You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 20:18:49 UTC
[incubator-pulsar] branch master updated: Don't initialize
/managed-ledgers on client creation (#2379) (#2509)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8763c74 Don't initialize /managed-ledgers on client creation (#2379) (#2509)
8763c74 is described below
commit 8763c746b5b38901d7fbb77e5766f55c563ba25e
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Sep 4 22:18:46 2018 +0200
Don't initialize /managed-ledgers on client creation (#2379) (#2509)
Normally the /managed-ledgers znode is created by the
initialize-cluster-metadata command when a cluster is being turned
up.
However, the ManagedLedger client also creates it on boot. This has
caused issues in the past, where if a broker is started before
initialize-cluster-metadata is run, then initialize-cluster-metadata
fails because it sees the /managed-ledger znode.
This patch removes the automatic creation of this znode from the
client boot process.
---
.../mledger/impl/MetaStoreImplZookeeper.java | 64 +++++++++++++++++++---
.../mledger/impl/MetaStoreImplZookeeperTest.java | 52 ++++++++++++++++++
.../bookkeeper/test/BookKeeperClusterTestCase.java | 4 ++
.../bookkeeper/test/MockedBookKeeperTestCase.java | 4 ++
4 files changed, 117 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
index 354c04f..4d90b91 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java
@@ -25,9 +25,11 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
+import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -35,7 +37,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -95,10 +96,6 @@ public class MetaStoreImplZookeeper implements MetaStore {
throws Exception {
this.zk = zk;
this.executor = executor;
-
- if (zk.exists(prefixName, false) == null) {
- zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT);
- }
}
//
@@ -157,8 +154,8 @@ public class MetaStoreImplZookeeper implements MetaStore {
}
};
- ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl,
- CreateMode.PERSISTENT, createcb, null);
+ asyncCreateFullPathOptimistic(zk, prefixName, ledgerName, new byte[0], Acl,
+ CreateMode.PERSISTENT, createcb);
} else {
// Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
// point
@@ -365,5 +362,58 @@ public class MetaStoreImplZookeeper implements MetaStore {
}
+ public static void asyncCreateFullPathOptimistic(
+ final ZooKeeper zk, final String basePath, final String nodePath, final byte[] data,
+ final List<ACL> acl, final CreateMode createMode, final StringCallback callback) {
+ String fullPath = basePath + "/" + nodePath;
+
+ zk.create(fullPath, data, acl, createMode,
+ (rc, path, ignoreCtx1, name) -> {
+ Runnable retry = () -> {
+ asyncCreateFullPathOptimistic(zk, basePath, nodePath, data,
+ acl, createMode, callback);
+ };
+
+ Consumer<Integer> complete = (finalrc) -> {
+ callback.processResult(finalrc, path, null, name);
+ };
+
+ if (rc != Code.NONODE.intValue()) {
+ complete.accept(rc);
+ return;
+ }
+
+ // Since I got a nonode, it means that my parents don't exist
+ // create mode is persistent since ephemeral nodes can't be
+ // parents
+ String nodeParent = new File(nodePath).getParent();
+ if (nodeParent == null) {
+ zk.exists(basePath, false,
+ (existsRc, existsPath, ignoreCtx2, stat) -> {
+ if (existsRc == Code.OK.intValue()) {
+ if (stat != null) {
+ retry.run();
+ } else {
+ complete.accept(Code.NONODE.intValue());
+ }
+ } else {
+ complete.accept(existsRc);
+ }
+ }, null);
+ } else {
+ nodeParent = nodeParent.replace("\\", "/");
+ asyncCreateFullPathOptimistic(
+ zk, basePath, nodeParent, new byte[0], acl, CreateMode.PERSISTENT,
+ (parentRc, parentPath, ignoreCtx3, parentName) -> {
+ if (parentRc == Code.OK.intValue() || parentRc == Code.NODEEXISTS.intValue()) {
+ retry.run();
+ } else {
+ complete.accept(parentRc);
+ }
+ });
+ }
+ }, null);
+ }
+
private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
index 17b9962..5f2b89a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -31,6 +34,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.Test;
@@ -214,4 +218,52 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
latch.await();
}
+
+ @Test(timeOut = 20000)
+ public void createOptimisticBaseNotExist() throws Exception {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+ zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+ (rc, path, ctx, name) -> {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ promise.completeExceptionally(KeeperException.create(rc));
+ } else {
+ promise.complete(null);
+ }
+ });
+ try {
+ promise.get();
+ fail("should have failed");
+ } catch (ExecutionException ee) {
+ assertEquals(ee.getCause().getClass(), KeeperException.NoNodeException.class);
+ }
+ }
+
+ @Test(timeOut = 20000)
+ public void createOptimisticBaseExists() throws Exception {
+ zkc.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+ zkc, "/foo", "bar/zar/gar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+ (rc, path, ctx, name) -> {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ promise.completeExceptionally(KeeperException.create(rc));
+ } else {
+ promise.complete(null);
+ }
+ });
+ promise.get();
+
+ CompletableFuture<Void> promise2 = new CompletableFuture<>();
+ MetaStoreImplZookeeper.asyncCreateFullPathOptimistic(
+ zkc, "/foo", "blah", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
+ (rc, path, ctx, name) -> {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ promise2.completeExceptionally(KeeperException.create(rc));
+ } else {
+ promise2.complete(null);
+ }
+ });
+ promise2.get();
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 1680f71..7f90042 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -47,7 +47,9 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +99,8 @@ public abstract class BookKeeperClusterTestCase {
startZKCluster();
// start bookkeeper service
startBKCluster();
+
+ zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
LOG.error("Error setting up", e);
throw e;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index e3c18a0..251a4c8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -28,7 +28,9 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -79,6 +81,8 @@ public abstract class MockedBookKeeperTestCase {
ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+ zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@AfterMethod