You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/02 21:05:07 UTC
[1/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID
Generation
Repository: bookkeeper
Updated Branches:
refs/heads/master 9c79e078b -> 057af8dbc
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
new file mode 100644
index 0000000..393f9b1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper based ledger id generator class, which using EPHEMERAL_SEQUENTIAL
+ * with <i>(ledgerIdGenPath)/HOB-[high-32-bits]/ID-</i> prefix to generate ledger id. Note
+ * zookeeper sequential counter has a format of %10d -- that is 10 digits with 0
+ * (zero) padding, i.e. "<path>0000000001", so ledger id space would be
+ * fundamentally limited to 9 billion. In practice, the id generated by zookeeper
+ * is only 31 bits (signed 32-bit integer), so the limit is much lower than 9 billion.
+ *
+ * In order to support the full range of the long ledgerId, once ledgerIds reach Integer.MAX_INT,
+ * a new system is employed. The 32 most significant bits of the ledger ID are taken and turned into
+ * a directory prefixed with <i>HOB-</i> under <i>(ledgerIdGenPath)</i>
+ *
+ * Under this <i>HOB-</i> directory, zookeeper is used to continue generating EPHEMERAL_SEQUENTIAL ids
+ * which constitute the lower 32-bits of the ledgerId (sign bit is always 0). Once the <i>HOB-</i> directory runs out of available
+ * ids, the process is repeated. The higher bits are incremented, a new <i>HOB-</i> directory is created, and
+ * zookeeper generates sequential ids underneath it.
+ *
+ * The reason for treating ids which are less than Integer.MAX_INT differently is to maintain backwards
+ * compatibility. This is a drop-in replacement for ZkLedgerIdGenerator.
+ */
+public class LongZkLedgerIdGenerator implements LedgerIdGenerator {
+ private static final Logger LOG = LoggerFactory.getLogger(LongZkLedgerIdGenerator.class);
+ private ZooKeeper zk;
+ private String ledgerIdGenPath;
+ private ZkLedgerIdGenerator shortIdGen;
+ private List<String> highOrderDirectories;
+ private HighOrderLedgerIdGenPathStatus ledgerIdGenPathStatus;
+ private final List<ACL> zkAcls;
+
+ private enum HighOrderLedgerIdGenPathStatus {
+ UNKNOWN,
+ PRESENT,
+ NOT_PRESENT
+ };
+
+ public LongZkLedgerIdGenerator(ZooKeeper zk, String ledgersPath, String idGenZnodeName, ZkLedgerIdGenerator shortIdGen, List<ACL> zkAcls) {
+ this.zk = zk;
+ if (StringUtils.isBlank(idGenZnodeName)) {
+ this.ledgerIdGenPath = ledgersPath;
+ } else {
+ this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
+ }
+ this.shortIdGen = shortIdGen;
+ highOrderDirectories = new ArrayList<String>();
+ ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.UNKNOWN;
+ this.zkAcls = zkAcls;
+ }
+
+ private void generateLongLedgerIdLowBits(final String ledgerPrefix, long highBits, final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+ String highPath = ledgerPrefix + formatHalfId((int)highBits);
+ ZkLedgerIdGenerator.generateLedgerIdImpl(new GenericCallback<Long>(){
+ @Override
+ public void operationComplete(int rc, Long result) {
+ if(rc == BKException.Code.OK) {
+ assert((highBits & 0xFFFFFFFF00000000l) == 0);
+ assert((result & 0xFFFFFFFF00000000l) == 0);
+ cb.operationComplete(rc, (highBits << 32) | result);
+ }
+ else if(rc == BKException.Code.LedgerIdOverflowException) {
+ // Lower bits are full. Need to expand and create another HOB node.
+ try {
+ Long newHighBits = highBits + 1;
+ createHOBPathAndGenerateId(ledgerPrefix, newHighBits.intValue(), cb);
+ }
+ catch (KeeperException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.InterruptedException, null);
+ } catch (IOException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.IllegalOpException, null);
+ }
+
+ }
+ }
+
+ }, zk, ZkLedgerIdGenerator.createLedgerPrefix(highPath, null), zkAcls);
+ }
+
+ /**
+ * Formats half an ID as 10-character 0-padded string
+ * @param i - 32 bits of the ID to format
+ * @return a 10-character 0-padded string.
+ */
+ private String formatHalfId(int i) {
+ return String.format("%010d", i);
+ }
+
+ private void createHOBPathAndGenerateId(String ledgerPrefix, int hob, final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+ try {
+ LOG.debug("Creating HOB path: {}", ledgerPrefix + formatHalfId(hob));
+ zk.create(ledgerPrefix + formatHalfId(hob), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ catch(KeeperException.NodeExistsException e) {
+ // It's fine if we lost a race to create the node (NodeExistsException).
+ // All other exceptions should continue unwinding.
+ LOG.debug("Tried to create High-order-bits node, but it already existed!", e);
+ }
+ // We just created a new HOB directory. Invalidate the directory cache
+ invalidateDirectoryCache();
+ generateLongLedgerId(cb); // Try again.
+ }
+
+ private void invalidateDirectoryCache() {
+ highOrderDirectories = null;
+ }
+
+ private void generateLongLedgerId(final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+ final String hobPrefix = "HOB-";
+ final String ledgerPrefix = this.ledgerIdGenPath + "/" + hobPrefix;
+
+ // Only pull the directories from zk if we don't have any cached.
+ boolean refreshedDirectories = false;
+ if(highOrderDirectories == null) {
+ refreshedDirectories = true;
+ highOrderDirectories = zk.getChildren(ledgerIdGenPath, false);
+ }
+
+ Optional<Long> largest = highOrderDirectories.stream()
+ .map((t) -> {
+ try {
+ return Long.parseLong(t.replace(hobPrefix, ""));
+ }
+ catch(NumberFormatException e) {
+ return null;
+ }
+ })
+ .filter((t) -> t != null)
+ .reduce(Math::max);
+
+ // If we didn't get any valid IDs from the directory...
+ if(!largest.isPresent()) {
+ if(!refreshedDirectories) {
+ // Our cache might be bad. Invalidate it and retry.
+ invalidateDirectoryCache();
+ generateLongLedgerId(cb); // Try again
+ }
+ else {
+ // else, Start at HOB-0000000001;
+ createHOBPathAndGenerateId(ledgerPrefix, 1, cb);
+ }
+ return;
+ }
+
+ // Found the largest.
+ // Get the low-order bits.
+ final Long highBits = largest.get();
+ generateLongLedgerIdLowBits(ledgerPrefix, highBits, cb);
+
+ // Perform garbage collection on HOB- directories.
+ // Keeping 3 should be plenty to prevent races
+ if(highOrderDirectories.size() > 3) {
+ Object[] highOrderDirs = highOrderDirectories.stream()
+ .map((t) -> {
+ try {
+ return Long.parseLong(t.replace(hobPrefix, ""));
+ }
+ catch(NumberFormatException e) {
+ return null;
+ }
+ })
+ .filter((t) -> t != null)
+ .sorted()
+ .toArray();
+
+ // Go ahead and invalidate. We want to reload cache even if we fail.
+ invalidateDirectoryCache();
+
+ for(int i = 0; i < highOrderDirs.length - 3; i++) {
+ String path = ledgerPrefix + formatHalfId(((Long)highOrderDirs[i]).intValue());
+ LOG.debug("DELETING HIGH ORDER DIR: {}", path);
+ try {
+ zk.delete(path, 0);
+ }
+ catch (KeeperException e) {
+ // We don't care if we fail. Just warn about it.
+ LOG.debug("Failed to delete {}", path);
+ }
+ }
+ }
+ }
+
+ private void createLongLedgerIdPathAndGenerateLongLedgerId(final GenericCallback<Long> cb, String createPath) {
+ ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerIdGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, new StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ try {
+ setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.PRESENT);
+ generateLongLedgerId(cb);
+ } catch (KeeperException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ } catch (InterruptedException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+ cb.operationComplete(BKException.Code.InterruptedException, null);
+ } catch (IOException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+ cb.operationComplete(BKException.Code.IllegalOpException, null);
+ }
+ }
+ }, null);
+ }
+
+ public void invalidateLedgerIdGenPathStatus() {
+ setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+ }
+
+ synchronized private void setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus status) {
+ ledgerIdGenPathStatus = status;
+ }
+
+ /**
+ * Checks the existence of the long ledger id gen path. Existence indicates we have switched from the legacy
+ * algorithm to the new method of generating 63-bit ids. If the existence is UNKNOWN, it looks in zk to
+ * find out. If it previously checked in zk, it returns that value. This value changes when we run out
+ * of ids < Integer.MAX_VALUE, and try to create the long ledger id gen path.
+ * @see createLongLedgerIdPathAndGenerateLongLedgerId
+ * @param zk
+ * @return Does the long ledger id gen path exist?
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ synchronized public boolean ledgerIdGenPathPresent(ZooKeeper zk) throws KeeperException, InterruptedException {
+ switch(ledgerIdGenPathStatus) {
+ case UNKNOWN:
+ if(zk.exists(ledgerIdGenPath, false) != null) {
+ ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.PRESENT;
+ return true;
+ }
+ else {
+ ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.NOT_PRESENT;
+ return false;
+ }
+ case PRESENT:
+ return true;
+ case NOT_PRESENT:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public void generateLedgerId(final GenericCallback<Long> cb) {
+ try {
+ if(!ledgerIdGenPathPresent(zk)) {
+ // We've not moved onto 63-bit ledgers yet.
+ shortIdGen.generateLedgerId(new GenericCallback<Long>(){
+ @Override
+ public void operationComplete(int rc, Long result) {
+ if(rc == BKException.Code.LedgerIdOverflowException) {
+ // 31-bit IDs overflowed. Start using 63-bit ids.
+ createLongLedgerIdPathAndGenerateLongLedgerId(cb, ledgerIdGenPath);
+ }
+ else {
+ // 31-bit Generation worked OK, or had some other
+ // error that we will pass on.
+ cb.operationComplete(rc, result);
+ }
+ }
+ });
+ }
+ else {
+ // We've already started generating 63-bit ledger IDs.
+ // Keep doing that.
+ generateLongLedgerId(cb);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.InterruptedException, null);
+ }
+ catch (IOException e) {
+ LOG.error("Failed to create long ledger ID path", e);
+ cb.operationComplete(BKException.Code.IllegalOpException, null);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ shortIdGen.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
index a2e79af..1373f34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
@@ -47,7 +47,6 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
static final String LEDGER_ID_GEN_PREFIX = "ID-";
final ZooKeeper zk;
- final String ledgerIdGenPath;
final String ledgerPrefix;
final List<ACL> zkAcls;
@@ -56,17 +55,26 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
String idGenZnodeName,
List<ACL> zkAcls) {
this.zk = zk;
+ ledgerPrefix = createLedgerPrefix(ledgersPath, idGenZnodeName);
this.zkAcls = zkAcls;
+ }
+
+ public static String createLedgerPrefix(String ledgersPath, String idGenZnodeName) {
+ String ledgerIdGenPath = null;
if (StringUtils.isBlank(idGenZnodeName)) {
- this.ledgerIdGenPath = ledgersPath;
+ ledgerIdGenPath = ledgersPath;
} else {
- this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
+ ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
}
- this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
+ return ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
}
@Override
public void generateLedgerId(final GenericCallback<Long> cb) {
+ generateLedgerIdImpl(cb, zk, ledgerPrefix, zkAcls);
+ }
+
+ public static void generateLedgerIdImpl(final GenericCallback<Long> cb, ZooKeeper zk, String ledgerPrefix, List<ACL> zkAcls) {
ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, new byte[0], zkAcls,
CreateMode.EPHEMERAL_SEQUENTIAL,
new StringCallback() {
@@ -84,8 +92,13 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
*/
long ledgerId;
try {
- ledgerId = getLedgerIdFromGenPath(idPathName);
- cb.operationComplete(BKException.Code.OK, ledgerId);
+ ledgerId = getLedgerIdFromGenPath(idPathName, ledgerPrefix);
+ if(ledgerId < 0 || ledgerId >= Integer.MAX_VALUE) {
+ cb.operationComplete(BKException.Code.LedgerIdOverflowException, null);
+ }
+ else {
+ cb.operationComplete(BKException.Code.OK, ledgerId);
+ }
} catch (IOException e) {
LOG.error("Could not extract ledger-id from id gen path:" + path, e);
cb.operationComplete(BKException.Code.ZKException, null);
@@ -109,7 +122,7 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
}
// get ledger id from generation path
- private long getLedgerIdFromGenPath(String nodeName) throws IOException {
+ private static long getLedgerIdFromGenPath(String nodeName, String ledgerPrefix) throws IOException {
long ledgerId;
try {
String parts[] = nodeName.split(ledgerPrefix);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index c2f658b..c0c110d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -57,7 +57,7 @@ public class StringUtils {
* ledger id
* @return the hierarchical path
*/
- public static String getHierarchicalLedgerPath(long ledgerId) {
+ public static String getShortHierarchicalLedgerPath(long ledgerId) {
String ledgerIdStr = getZKStringId(ledgerId);
// do 2-4-4 split
StringBuilder sb = new StringBuilder();
@@ -90,6 +90,13 @@ public class StringUtils {
return sb.toString();
}
+ public static String getHybridHierarchicalLedgerPath(long ledgerId) {
+ if(ledgerId < Integer.MAX_VALUE) {
+ return getShortHierarchicalLedgerPath(ledgerId);
+ }
+ return getLongHierarchicalLedgerPath(ledgerId);
+ }
+
/**
* Parse the hierarchical ledger path to its ledger id
*
@@ -119,7 +126,7 @@ public class StringUtils {
throws IOException {
String[] longHierarchicalParts = longHierarchicalLedgerPath.split("/");
if (longHierarchicalParts.length != 5) {
- throw new IOException("it is not a valid hierarchical path name : " + longHierarchicalLedgerPath);
+ return stringToHierarchicalLedgerId(longHierarchicalLedgerPath);
}
longHierarchicalParts[4] =
longHierarchicalParts[4].substring(LEDGER_NODE_PREFIX.length());
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 777b619..d25bd70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -882,10 +883,13 @@ public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
public void ensurePasswordUsedForOldLedgers() throws Exception {
// This test bases on creating old ledgers in version 4.1.0, which only
// supports ZooKeeper based flat and hierarchical LedgerManagerFactory.
- // So we ignore it for MSLedgerManagerFactory.
+ // So we ignore it for MSLedgerManagerFactory and LongHierarchicalLedgerManagerFactory.
if (MSLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
return;
}
+ if (LongHierarchicalLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
+ return;
+ }
// stop all bookies
// and wipe the ledger layout so we can use an old client
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
new file mode 100644
index 0000000..75422a4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.TestCase;
+
+public class TestLongZkLedgerIdGenerator extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestZkLedgerIdGenerator.class);
+
+ ZooKeeperUtil zkutil;
+ ZooKeeper zk;
+
+ LongZkLedgerIdGenerator ledgerIdGenerator;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("Setting up test");
+ super.setUp();
+
+ zkutil = new ZooKeeperUtil();
+ zkutil.startServer();
+ zk = zkutil.getZooKeeperClient();
+
+ ZkLedgerIdGenerator shortLedgerIdGenerator = new ZkLedgerIdGenerator(zk,
+ "/test-zk-ledger-id-generator", "idgen", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ ledgerIdGenerator = new LongZkLedgerIdGenerator(zk,
+ "/test-zk-ledger-id-generator", "idgen-long", shortLedgerIdGenerator, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("Tearing down test");
+ ledgerIdGenerator.close();
+ zk.close();
+ zkutil.killServer();
+
+ super.tearDown();
+ }
+
+ @Test(timeout=60000)
+ public void testGenerateLedgerId() throws Exception {
+ // Create *nThread* threads each generate *nLedgers* ledger id,
+ // and then check there is no identical ledger id.
+ final int nThread = 2;
+ final int nLedgers = 2000;
+ // Multiply by two. We're going to do half in the old legacy space and half in the new.
+ final CountDownLatch countDownLatch = new CountDownLatch(nThread*nLedgers*2);
+
+ final AtomicInteger errCount = new AtomicInteger(0);
+ final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>();
+ final GenericCallback<Long> cb = new GenericCallback<Long>() {
+ @Override
+ public void operationComplete(int rc, Long result) {
+ if (Code.OK.intValue() == rc) {
+ ledgerIds.add(result);
+ } else {
+ errCount.incrementAndGet();
+ }
+ countDownLatch.countDown();
+ }
+ };
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < nThread; i++) {
+ new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < nLedgers; j++) {
+ ledgerIdGenerator.generateLedgerId(cb);
+ }
+ }
+ }.start();
+ }
+
+ // Go and create the long-id directory in zookeeper. This should cause the id generator to generate ids with the
+ // new algo once we clear it's stored status.
+ ZkUtils.createFullPathOptimistic(zk, "/test-zk-ledger-id-generator/idgen-long", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ ledgerIdGenerator.invalidateLedgerIdGenPathStatus();
+
+ for (int i = 0; i < nThread; i++) {
+ new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < nLedgers; j++) {
+ ledgerIdGenerator.generateLedgerId(cb);
+ }
+ }
+ }.start();
+ }
+
+ assertTrue("Wait ledger id generation threads to stop timeout : ",
+ countDownLatch.await(30, TimeUnit.SECONDS));
+ LOG.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(),
+ System.currentTimeMillis() - start);
+ assertEquals("Error occur during ledger id generation : ", 0, errCount.get());
+
+ Set<Long> ledgers = new HashSet<Long>();
+ while (!ledgerIds.isEmpty()) {
+ Long ledger = ledgerIds.poll();
+ assertNotNull("Generated ledger id is null : ", ledger);
+ assertFalse("Ledger id [" + ledger + "] conflict : ", ledgers.contains(ledger));
+ ledgers.add(ledger);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
index 2c9a1f4..357bd34 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
@@ -48,6 +48,7 @@ public abstract class MultiLedgerManagerMultiDigestTestCase extends BookKeeperCl
public static Collection<Object[]> configs() {
String[] ledgerManagers = {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.LegacyHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.MSLedgerManagerFactory",
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
index 34a22af..cb640b1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
@@ -42,6 +42,7 @@ public abstract class MultiLedgerManagerTestCase extends BookKeeperClusterTestCa
public static Collection<Object[]> configs() {
String[] ledgerManagers = new String[] {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.LegacyHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.MSLedgerManagerFactory",
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
index 3249194..18504e3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.FileSystemUpgrade;
import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.util.IOUtils;
@@ -437,6 +438,17 @@ public class TestBackwardCompat {
return new LedgerCurrent(newbk, newlh);
}
+ static LedgerCurrent openLedger(long id, ClientConfiguration conf) throws Exception {
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ org.apache.bookkeeper.client.BookKeeper newbk
+ = new org.apache.bookkeeper.client.BookKeeper(conf);
+ org.apache.bookkeeper.client.LedgerHandle newlh
+ = newbk.openLedger(id,
+ org.apache.bookkeeper.client.BookKeeper.DigestType.CRC32,
+ "foobar".getBytes());
+ return new LedgerCurrent(newbk, newlh);
+ }
+
long getId() {
return lh.getId();
}
@@ -827,4 +839,45 @@ public class TestBackwardCompat {
oldledger.close();
scur.stop();
}
+
+ /**
+ * Test compatability between version old version and the current version
+ * with respect to the HierarchicalLedgerManagers.
+ * - 4.2.0 server starts with HierarchicalLedgerManager.
+ * - Write ledgers with old and new clients
+ * - Read ledgers written by old clients.
+ */
+ @Test(timeout = 60000)
+ public void testCompatHierarchicalLedgerManager() throws Exception {
+ File journalDir = createTempDir("bookie", "journal");
+ File ledgerDir = createTempDir("bookie", "ledger");
+
+ int port = PortManager.nextFreePort();
+ // start server, upgrade
+ Server420 s420 = new Server420(journalDir, ledgerDir, port);
+ s420.getConf().setLedgerManagerFactoryClassName("org.apache.bk_v4_2_0.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ s420.start();
+
+ Ledger420 l420 = Ledger420.newLedger();
+ l420.write100();
+ long oldLedgerId = l420.getId();
+ l420.close();
+ s420.stop();
+
+ // Start the current server
+ ServerCurrent scur = new ServerCurrent(journalDir, ledgerDir, port, true);
+ scur.getConf().setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ scur.getConf().setProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK, true);
+ scur.start();
+
+ // Munge the conf so we can test.
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ conf.setProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK, true);
+
+ // check that new client can read old ledgers on new server
+ LedgerCurrent oldledger = LedgerCurrent.openLedger(oldLedgerId, conf);
+ assertEquals("Failed to read entries!", 100, oldledger.readAll());
+ oldledger.close();
+ }
}
[2/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID
Generation
Posted by mm...@apache.org.
BOOKKEEPER-552: 64 Bits Ledger ID Generation
This PR supersedes #112
Instead of moving LongHierarchicalLedgerManager to HierarchicalLedgerManager, LongHierarchicalLedgerManager is still a stand-alone manager. HierarchicalLedgerManager has been moved to LegacyHierarchicalLedgerManager, and a new HierarchicalLedgerManager class has been put in its place, which is backwards-compatible with the original (now legacy) HierarchicalLedgerManager.
This new HierarchicalLedgerManager leverages the new LongZkLedgerIdGenerator to generate Ids, and uses the LongHierarchicalLedgerManager to manage metadata for ledger IDs > 31 bits long. For shorter ledger ids, the LegacyHierarchicalLedgerManager is used, to keep backwards-compatibility.
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>
Closes #114 from knusbaum/BOOKKEEPER-552
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/057af8db
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/057af8db
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/057af8db
Branch: refs/heads/master
Commit: 057af8dbce6c08794eb8b46ca52ca13f222d9bbb
Parents: 9c79e07
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue May 2 14:04:57 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue May 2 14:04:57 2017 -0700
----------------------------------------------------------------------
.../apache/bookkeeper/client/BKException.java | 12 +
.../bookkeeper/conf/AbstractConfiguration.java | 3 +
.../meta/AbstractHierarchicalLedgerManager.java | 213 +++++++++++
.../meta/AbstractZkLedgerManager.java | 3 +-
.../bookkeeper/meta/FlatLedgerManager.java | 6 +-
.../meta/HierarchicalLedgerManager.java | 375 +++----------------
.../meta/HierarchicalLedgerManagerFactory.java | 80 +---
.../bookkeeper/meta/LedgerManagerFactory.java | 5 +-
.../meta/LegacyHierarchicalLedgerManager.java | 281 ++++++++++++++
.../LegacyHierarchicalLedgerManagerFactory.java | 100 +++++
.../meta/LongHierarchicalLedgerManager.java | 101 +++--
.../meta/LongZkLedgerIdGenerator.java | 333 ++++++++++++++++
.../bookkeeper/meta/ZkLedgerIdGenerator.java | 27 +-
.../org/apache/bookkeeper/util/StringUtils.java | 11 +-
.../bookkeeper/client/BookieRecoveryTest.java | 6 +-
.../meta/TestLongZkLedgerIdGenerator.java | 145 +++++++
.../MultiLedgerManagerMultiDigestTestCase.java | 1 +
.../test/MultiLedgerManagerTestCase.java | 1 +
.../bookkeeper/test/TestBackwardCompat.java | 53 +++
19 files changed, 1310 insertions(+), 446 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 2377c1c..aa3ec08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -102,6 +102,8 @@ public abstract class BKException extends Exception {
return new BKDuplicateEntryIdException();
case Code.TimeoutException:
return new BKTimeoutException();
+ case Code.LedgerIdOverflowException:
+ return new BKLedgerIdOverflowException();
default:
return new BKUnexpectedConditionException();
}
@@ -142,6 +144,8 @@ public abstract class BKException extends Exception {
int UnauthorizedAccessException = -102;
int UnclosedFragmentException = -103;
int WriteOnReadOnlyBookieException = -104;
+ //-105 reserved for TooManyRequestsException
+ int LedgerIdOverflowException = -106;
// generic exception code used to propagate in replication pipeline
int ReplicationException = -200;
@@ -210,6 +214,8 @@ public abstract class BKException extends Exception {
return "Attempting to use an unclosed fragment; This is not safe";
case Code.WriteOnReadOnlyBookieException:
return "Attempting to write on ReadOnly bookie";
+ case Code.LedgerIdOverflowException:
+ return "Next ledgerID is too large.";
case Code.ReplicationException:
return "Errors in replication pipeline";
case Code.ClientClosedException:
@@ -404,4 +410,10 @@ public abstract class BKException extends Exception {
super(Code.TimeoutException);
}
}
+
+ public static class BKLedgerIdOverflowException extends BKException {
+ public BKLedgerIdOverflowException() {
+ super(Code.LedgerIdOverflowException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 07e5d08..c7c50cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -76,6 +76,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
// Zookeeper ACL settings
protected final static String ZK_ENABLE_SECURITY = "zkEnableSecurity";
+ // Kluge for compatibility testing. Never set this outside tests.
+ public final static String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck";
+
protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
new file mode 100644
index 0000000..02359e0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHierarchicalLedgerManager extends AbstractZkLedgerManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractHierarchicalLedgerManager.class);
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ */
+ public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
+ }
+
+ /**
+ * Process hash nodes in a given path
+ */
+ void asyncProcessLevelNodes(
+ final String path, final Processor<String> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ zk.sync(path, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error syncing path " + path + " when getting its chidren: ",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+
+ zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> levelNodes) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error polling hash nodes of " + path,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ AsyncListProcessor<String> listProcessor =
+ new AsyncListProcessor<String>(scheduler);
+ // process its children
+ listProcessor.process(levelNodes, processor, finalCb,
+ context, successRc, failureRc);
+ }
+ }, null);
+ }
+ }, null);
+ }
+
+ /**
+ * Process list one by one in asynchronize way. Process will be stopped immediately
+ * when error occurred.
+ */
+ private static class AsyncListProcessor<T> {
+ // use this to prevent long stack chains from building up in callbacks
+ ScheduledExecutorService scheduler;
+
+ /**
+ * Constructor
+ *
+ * @param scheduler
+ * Executor used to prevent long stack chains
+ */
+ public AsyncListProcessor(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Process list of items
+ *
+ * @param data
+ * List of data to process
+ * @param processor
+ * Callback to process element of list when success
+ * @param finalCb
+ * Final callback to be called after all elements in the list are processed
+ * @param context
+ * Context of final callback
+ * @param successRc
+ * RC passed to final callback on success
+ * @param failureRc
+ * RC passed to final callback on failure
+ */
+ public void process(final List<T> data, final Processor<T> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ if (data == null || data.size() == 0) {
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ final int size = data.size();
+ final AtomicInteger current = new AtomicInteger(0);
+ AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != successRc) {
+ // terminal immediately
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ // process next element
+ int next = current.incrementAndGet();
+ if (next >= size) { // reach the end of list
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ final T dataToProcess = data.get(next);
+ final AsyncCallback.VoidCallback stub = this;
+ scheduler.submit(new Runnable() {
+ @Override
+ public final void run() {
+ processor.process(dataToProcess, stub);
+ }
+ });
+ }
+ };
+ T firstElement = data.get(0);
+ processor.process(firstElement, stubCallback);
+ }
+ }
+
+ // get ledger from all level nodes
+ long getLedgerId(String...levelNodes) throws IOException {
+ return StringUtils.stringToHierarchicalLedgerId(levelNodes);
+ }
+
+ /**
+ * Get all ledger ids in the given zk path.
+ *
+ * @param ledgerNodes
+ * List of ledgers in the given path
+ * example:- {L1652, L1653, L1650}
+ * @param path
+ * The zookeeper path of the ledger ids. The path should start with {@ledgerRootPath}
+ * example (with ledgerRootPath = /ledgers):- /ledgers/00/0053
+ */
+ @Override
+ protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+ NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>();
+
+ if (!path.startsWith(ledgerRootPath)) {
+ LOG.warn("Ledger path [{}] is not a valid path name, it should start wth {}", path, ledgerRootPath);
+ return zkActiveLedgers;
+ }
+
+ long ledgerIdPrefix = 0;
+ char ch;
+ for (int i = ledgerRootPath.length() + 1; i < path.length(); i++) {
+ ch = path.charAt(i);
+ if (ch < '0' || ch > '9')
+ continue;
+ ledgerIdPrefix = ledgerIdPrefix * 10 + (ch - '0');
+ }
+
+ for (String ledgerNode : ledgerNodes) {
+ if (isSpecialZnode(ledgerNode)) {
+ continue;
+ }
+ long ledgerId = ledgerIdPrefix;
+ for (int i = 0; i < ledgerNode.length(); i++) {
+ ch = ledgerNode.charAt(i);
+ if (ch < '0' || ch > '9')
+ continue;
+ ledgerId = ledgerId * 10 + (ch - '0');
+ }
+ zkActiveLedgers.add(ledgerId);
+ }
+ return zkActiveLedgers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 6db3375..f5a60f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
@@ -468,7 +469,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
* Znode Name
* @return true if the znode is a special znode otherwise false
*/
- protected boolean isSpecialZnode(String znode) {
+ protected static boolean isSpecialZnode(String znode) {
if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
|| BookKeeperConstants.COOKIE_NODE.equals(znode)
|| BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 3172247..36db62a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -87,9 +87,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
}
- @Override
- protected boolean isSpecialZnode(String znode) {
- return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+
+ protected static boolean isSpecialZnode(String znode) {
+ return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || AbstractZkLedgerManager.isSpecialZnode(znode);
}
@Override
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index bed1627..309762b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -18,370 +18,105 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.ZooKeeper;
/**
- * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ * HierarchicalLedgerManager makes use of both LongHierarchicalLedgerManager and LegacyHierarchicalLedgerManager
+ * to extend the 31-bit ledger id range of the LegacyHierarchicalLedgerManager to that of the LongHierarchicalLedgerManager
+ * while remaining backwards-compatible with the legacy manager.
+ *
+ * In order to achieve backwards-compatibility, the HierarchicalLedgerManager forwards requests relating to ledger IDs which
+ * are < Integer.MAX_INT to the LegacyHierarchicalLedgerManager. The new 5-part directory structure will not appear until a
+ * ledger with an ID >= Integer.MAX_INT is created.
*
- * <p>
- * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
- * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre>
- * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
- * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
- * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
- * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * errors during garbage collection due to lists of children that are too long.
+ * @see LongHierarchicalLedgerManager
+ * @see LegacyHierarchicalLedgerManager
*/
-class HierarchicalLedgerManager extends AbstractZkLedgerManager {
-
+class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
- static final String IDGEN_ZNODE = "idgen";
- private static final String MAX_ID_SUFFIX = "9999";
- private static final String MIN_ID_SUFFIX = "0000";
+ LegacyHierarchicalLedgerManager legacyLM;
+ LongHierarchicalLedgerManager longLM;
- /**
- * Constructor
- *
- * @param conf
- * Configuration object
- * @param zk
- * ZooKeeper Client Handle
- */
public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
super(conf, zk);
+ legacyLM = new LegacyHierarchicalLedgerManager(conf, zk);
+ longLM = new LongHierarchicalLedgerManager (conf, zk);
}
@Override
- public String getLedgerPath(long ledgerId) {
- return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
- }
-
- @Override
- public long getLedgerId(String pathName) throws IOException {
- if (!pathName.startsWith(ledgerRootPath)) {
- throw new IOException("it is not a valid hashed path name : " + pathName);
- }
- String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
- return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
- }
-
- // get ledger from all level nodes
- long getLedgerId(String...levelNodes) throws IOException {
- return StringUtils.stringToHierarchicalLedgerId(levelNodes);
- }
-
- //
- // Active Ledger Manager
- //
-
- /**
- * Get the smallest cache id in a specified node /level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @return the smallest ledger id
- */
- private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
- return getLedgerId(level1, level2, MIN_ID_SUFFIX);
- }
-
- /**
- * Get the largest cache id in a specified node /level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @return the largest ledger id
- */
- private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
- return getLedgerId(level1, level2, MAX_ID_SUFFIX);
- }
-
- @Override
- public void asyncProcessLedgers(final Processor<Long> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- // process 1st level nodes
- asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
- @Override
- public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
- if (isSpecialZnode(l1Node)) {
- cb1.processResult(successRc, null, context);
- return;
- }
- final String l1NodePath = ledgerRootPath + "/" + l1Node;
- // process level1 path, after all children of level1 process
- // it callback to continue processing next level1 node
- asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
- @Override
- public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
- // process level1/level2 path
- String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
- // process each ledger
- // after all ledger are processed, cb2 will be call to continue processing next level2 node
- asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
- context, successRc, failureRc);
- }
- }, cb1, context, successRc, failureRc);
- }
- }, finalCb, context, successRc, failureRc);
- }
+ public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc,
+ int failureRc) {
+ // Process the old 31-bit id ledgers first.
+ legacyLM.asyncProcessLedgers(processor, new VoidCallback(){
- /**
- * Process hash nodes in a given path
- */
- void asyncProcessLevelNodes(
- final String path, final Processor<String> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- zk.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Error syncing path " + path + " when getting its chidren: ",
- KeeperException.create(KeeperException.Code.get(rc), path));
- finalCb.processResult(failureRc, null, context);
- return;
+ if(rc == failureRc) {
+ // If it fails, return the failure code to the callback
+ finalCb.processResult(rc, path, ctx);
+ }
+ else {
+ // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
+ longLM.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
}
-
- zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx,
- List<String> levelNodes) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Error polling hash nodes of " + path,
- KeeperException.create(KeeperException.Code.get(rc), path));
- finalCb.processResult(failureRc, null, context);
- return;
- }
- AsyncListProcessor<String> listProcessor =
- new AsyncListProcessor<String>(scheduler);
- // process its children
- listProcessor.process(levelNodes, processor, finalCb,
- context, successRc, failureRc);
- }
- }, null);
}
- }, null);
- }
-
- /**
- * Process list one by one in asynchronize way. Process will be stopped immediately
- * when error occurred.
- */
- private static class AsyncListProcessor<T> {
- // use this to prevent long stack chains from building up in callbacks
- ScheduledExecutorService scheduler;
- /**
- * Constructor
- *
- * @param scheduler
- * Executor used to prevent long stack chains
- */
- public AsyncListProcessor(ScheduledExecutorService scheduler) {
- this.scheduler = scheduler;
- }
+ }, context, successRc, failureRc);
+ }
- /**
- * Process list of items
- *
- * @param data
- * List of data to process
- * @param processor
- * Callback to process element of list when success
- * @param finalCb
- * Final callback to be called after all elements in the list are processed
- * @param context
- * Context of final callback
- * @param successRc
- * RC passed to final callback on success
- * @param failureRc
- * RC passed to final callback on failure
- */
- public void process(final List<T> data, final Processor<T> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- if (data == null || data.size() == 0) {
- finalCb.processResult(successRc, null, context);
- return;
- }
- final int size = data.size();
- final AtomicInteger current = new AtomicInteger(0);
- AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != successRc) {
- // terminal immediately
- finalCb.processResult(failureRc, null, context);
- return;
- }
- // process next element
- int next = current.incrementAndGet();
- if (next >= size) { // reach the end of list
- finalCb.processResult(successRc, null, context);
- return;
- }
- final T dataToProcess = data.get(next);
- final AsyncCallback.VoidCallback stub = this;
- scheduler.submit(new Runnable() {
- @Override
- public final void run() {
- processor.process(dataToProcess, stub);
- }
- });
- }
- };
- T firstElement = data.get(0);
- processor.process(firstElement, stubCallback);
- }
+ @Override
+ protected String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
}
@Override
- protected boolean isSpecialZnode(String znode) {
- return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
+ protected long getLedgerId(String ledgerPath) throws IOException {
+ // TODO Auto-generated method stub
+ if (!ledgerPath.startsWith(ledgerRootPath)) {
+ throw new IOException("it is not a valid hashed path name : " + ledgerPath);
+ }
+ String hierarchicalPath = ledgerPath.substring(ledgerRootPath.length() + 1);
+ return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
}
@Override
public LedgerRangeIterator getLedgerRanges() {
- return new HierarchicalLedgerRangeIterator();
+ LedgerRangeIterator legacyLedgerRangeIterator = legacyLM.getLedgerRanges();
+ LedgerRangeIterator longLedgerRangeIterator = longLM.getLedgerRanges();
+ return new HierarchicalLedgerRangeIterator(legacyLedgerRangeIterator, longLedgerRangeIterator);
}
- /**
- * Iterator through each metadata bucket with hierarchical mode
- */
- private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
- private Iterator<String> l1NodesIter = null;
- private Iterator<String> l2NodesIter = null;
- private String curL1Nodes = "";
- private boolean iteratorDone = false;
- private LedgerRange nextRange = null;
+ private static class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
- /**
- * iterate next level1 znode
- *
- * @return false if have visited all level1 nodes
- * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
- */
- private boolean nextL1Node() throws KeeperException, InterruptedException {
- l2NodesIter = null;
- while (l2NodesIter == null) {
- if (l1NodesIter.hasNext()) {
- curL1Nodes = l1NodesIter.next();
- } else {
- return false;
- }
- if (isSpecialZnode(curL1Nodes)) {
- continue;
- }
- List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
- Collections.sort(l2Nodes);
- l2NodesIter = l2Nodes.iterator();
- if (!l2NodesIter.hasNext()) {
- l2NodesIter = null;
- continue;
- }
- }
- return true;
- }
+ LedgerRangeIterator legacyLedgerRangeIterator;
+ LedgerRangeIterator longLedgerRangeIterator;
- synchronized private void preload() throws IOException {
- while (nextRange == null && !iteratorDone) {
- boolean hasMoreElements = false;
- try {
- if (l1NodesIter == null) {
- List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
- Collections.sort(l1Nodes);
- l1NodesIter = l1Nodes.iterator();
- hasMoreElements = nextL1Node();
- } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
- hasMoreElements = nextL1Node();
- } else {
- hasMoreElements = true;
- }
- } catch (KeeperException ke) {
- throw new IOException("Error preloading next range", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while preloading", ie);
- }
- if (hasMoreElements) {
- nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
- if (nextRange.size() == 0) {
- nextRange = null;
- }
- } else {
- iteratorDone = true;
- }
- }
+ HierarchicalLedgerRangeIterator(LedgerRangeIterator legacyLedgerRangeIterator, LedgerRangeIterator longLedgerRangeIterator) {
+ this.legacyLedgerRangeIterator = legacyLedgerRangeIterator;
+ this.longLedgerRangeIterator = longLedgerRangeIterator;
}
@Override
- synchronized public boolean hasNext() throws IOException {
- preload();
- return nextRange != null && !iteratorDone;
+ public boolean hasNext() throws IOException {
+ return legacyLedgerRangeIterator.hasNext() || longLedgerRangeIterator.hasNext();
}
@Override
- synchronized public LedgerRange next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public LedgerRange next() throws IOException {
+ if(legacyLedgerRangeIterator.hasNext()) {
+ return legacyLedgerRangeIterator.next();
}
- LedgerRange r = nextRange;
- nextRange = null;
- return r;
+ return longLedgerRangeIterator.next();
}
- /**
- * Get a single node level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @throws IOException
- */
- LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
- throws IOException {
- StringBuilder nodeBuilder = new StringBuilder();
- nodeBuilder.append(ledgerRootPath).append("/")
- .append(level1).append("/").append(level2);
- String nodePath = nodeBuilder.toString();
- List<String> ledgerNodes = null;
- try {
- ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
- } catch (InterruptedException e) {
- throw new IOException("Error when get child nodes from zk", e);
- }
- NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK for hash node "
- + level1 + "/" + level2 + " : " + zkActiveLedgers);
- }
-
- return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
- getEndLedgerIdByLevel(level1, level2), true));
- }
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index 084d73d..a74a633 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,89 +15,25 @@ package org.apache.bookkeeper.meta;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.meta;
-import java.io.IOException;
import java.util.List;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.bookkeeper.replication.ReplicationException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
/**
- * Hierarchical Ledger Manager Factory
+ * Legacy Hierarchical Ledger Manager Factory
*/
-public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+public class HierarchicalLedgerManagerFactory extends LegacyHierarchicalLedgerManagerFactory {
public static final String NAME = "hierarchical";
- public static final int CUR_VERSION = 1;
-
- AbstractConfiguration conf;
- ZooKeeper zk;
-
- @Override
- public int getCurrentVersion() {
- return CUR_VERSION;
- }
-
- @Override
- public LedgerManagerFactory initialize(final AbstractConfiguration conf,
- final ZooKeeper zk,
- final int factoryVersion)
- throws IOException {
- if (CUR_VERSION != factoryVersion) {
- throw new IOException("Incompatible layout version found : "
- + factoryVersion);
- }
- this.conf = conf;
- this.zk = zk;
- return this;
- }
-
- @Override
- public void uninitialize() throws IOException {
- // since zookeeper instance is passed from outside
- // we don't need to close it here
- }
-
+
@Override
public LedgerIdGenerator newLedgerIdGenerator() {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
- return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
- }
-
- @Override
- public LedgerManager newLedgerManager() {
- return new HierarchicalLedgerManager(conf, zk);
- }
-
- @Override
- public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
- return new ZkLedgerUnderreplicationManager(conf, zk);
- }
-
- @Override
- public void format(AbstractConfiguration conf, ZooKeeper zk)
- throws InterruptedException, KeeperException, IOException {
- HierarchicalLedgerManager ledgerManager = (HierarchicalLedgerManager) newLedgerManager();
- try {
- String ledgersRootPath = conf.getZkLedgersRootPath();
- List<String> children = zk.getChildren(ledgersRootPath, false);
- for (String child : children) {
- if (ledgerManager.isSpecialZnode(child)) {
- continue;
- }
- ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
- }
- } finally {
- ledgerManager.close();
- }
- // Delete and recreate the LAYOUT information.
- super.format(conf, zk);
+ ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+ return new LongZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LongHierarchicalLedgerManager.IDGEN_ZNODE, subIdGenerator, zkAcls);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 76d1572..e7cfc4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -164,7 +164,8 @@ public abstract class LedgerManagerFactory {
// handle V2 layout case
if (factoryClass != null &&
- !layout.getManagerFactoryClass().equals(factoryClass.getName())) {
+ !layout.getManagerFactoryClass().equals(factoryClass.getName()) &&
+ conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) { // Disable should ONLY happen during compatibility testing.
throw new IOException("Configured layout " + factoryClass.getName()
+ " does not match existing layout " + layout.getManagerFactoryClass());
@@ -210,6 +211,8 @@ public abstract class LedgerManagerFactory {
factoryClass = FlatLedgerManagerFactory.class;
} else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
factoryClass = HierarchicalLedgerManagerFactory.class;
+ } else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
+ factoryClass = LongHierarchicalLedgerManagerFactory.class;
} else {
throw new IOException("Unknown ledger manager type: "
+ lmType);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
new file mode 100644
index 0000000..8be23b2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ *
+ * <p>
+ * LegacyHierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre>
+ * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
+ * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
+ * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
+ * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
+ * errors during garbage collection due to lists of children that are too long.
+ */
+class LegacyHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
+
+ static final Logger LOG = LoggerFactory.getLogger(LegacyHierarchicalLedgerManager.class);
+
+ static final String IDGEN_ZNODE = "idgen";
+ private static final String MAX_ID_SUFFIX = "9999";
+ private static final String MIN_ID_SUFFIX = "0000";
+
+ private static final ThreadLocal<StringBuilder> threadLocalNodeBuilder = new ThreadLocal<StringBuilder>() {
+ @Override
+ protected StringBuilder initialValue() {
+ return new StringBuilder();
+ }
+ };
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ */
+ public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
+ }
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getShortHierarchicalLedgerPath(ledgerId);
+ }
+
+ @Override
+ public long getLedgerId(String pathName) throws IOException {
+ if (!pathName.startsWith(ledgerRootPath)) {
+ throw new IOException("it is not a valid hashed path name : " + pathName);
+ }
+ String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+ return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
+ }
+
+ //
+ // Active Ledger Manager
+ //
+
+ /**
+ * Get the smallest cache id in a specified node /level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @return the smallest ledger id
+ */
+ private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
+ return getLedgerId(level1, level2, MIN_ID_SUFFIX);
+ }
+
+ /**
+ * Get the largest cache id in a specified node /level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @return the largest ledger id
+ */
+ private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
+ return getLedgerId(level1, level2, MAX_ID_SUFFIX);
+ }
+
+ @Override
+ public void asyncProcessLedgers(final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ // process 1st level nodes
+ asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
+ @Override
+ public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
+ if (isSpecialZnode(l1Node)) {
+ cb1.processResult(successRc, null, context);
+ return;
+ }
+ final String l1NodePath = ledgerRootPath + "/" + l1Node;
+ // process level1 path, after all children of level1 process
+ // it callback to continue processing next level1 node
+ asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
+ @Override
+ public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
+ // process level1/level2 path
+ String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
+ // process each ledger
+ // after all ledger are processed, cb2 will be call to continue processing next level2 node
+ asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
+ context, successRc, failureRc);
+ }
+ }, cb1, context, successRc, failureRc);
+ }
+ }, finalCb, context, successRc, failureRc);
+ }
+
+ protected static boolean isSpecialZnode(String znode) {
+ return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode) || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+ }
+
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return new HierarchicalLedgerRangeIterator();
+ }
+
+ /**
+ * Iterator through each metadata bucket with hierarchical mode
+ */
+ private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+ private Iterator<String> l1NodesIter = null;
+ private Iterator<String> l2NodesIter = null;
+ private String curL1Nodes = "";
+ private boolean iteratorDone = false;
+ private LedgerRange nextRange = null;
+
+ /**
+ * iterate next level1 znode
+ *
+ * @return false if have visited all level1 nodes
+ * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+ */
+ private boolean nextL1Node() throws KeeperException, InterruptedException {
+ l2NodesIter = null;
+ while (l2NodesIter == null) {
+ if (l1NodesIter.hasNext()) {
+ curL1Nodes = l1NodesIter.next();
+ } else {
+ return false;
+ }
+ // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes)
+ if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+ continue;
+ }
+ List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+ Collections.sort(l2Nodes);
+ l2NodesIter = l2Nodes.iterator();
+ if (!l2NodesIter.hasNext()) {
+ l2NodesIter = null;
+ continue;
+ }
+ }
+ return true;
+ }
+
+ synchronized private void preload() throws IOException {
+ while (nextRange == null && !iteratorDone) {
+ boolean hasMoreElements = false;
+ try {
+ if (l1NodesIter == null) {
+ List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
+ Collections.sort(l1Nodes);
+ l1NodesIter = l1Nodes.iterator();
+ hasMoreElements = nextL1Node();
+ } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
+ hasMoreElements = nextL1Node();
+ } else {
+ hasMoreElements = true;
+ }
+ } catch (KeeperException ke) {
+ throw new IOException("Error preloading next range", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while preloading", ie);
+ }
+ if (hasMoreElements) {
+ nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+ if (nextRange.size() == 0) {
+ nextRange = null;
+ }
+ } else {
+ iteratorDone = true;
+ }
+ }
+ }
+
+ @Override
+ synchronized public boolean hasNext() throws IOException {
+ preload();
+ return nextRange != null && !iteratorDone;
+ }
+
+ @Override
+ synchronized public LedgerRange next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ LedgerRange r = nextRange;
+ nextRange = null;
+ return r;
+ }
+
+ /**
+ * Get a single node level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @throws IOException
+ */
+ LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+ throws IOException {
+ StringBuilder nodeBuilder = threadLocalNodeBuilder.get();
+ nodeBuilder.setLength(0);
+ nodeBuilder.append(ledgerRootPath).append("/")
+ .append(level1).append("/").append(level2);
+ String nodePath = nodeBuilder.toString();
+ List<String> ledgerNodes = null;
+ try {
+ ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+ } catch (InterruptedException e) {
+ throw new IOException("Error when get child nodes from zk", e);
+ }
+ NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK for hash node "
+ + level1 + "/" + level2 + " : " + zkActiveLedgers);
+ }
+
+ return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
+ getEndLedgerIdByLevel(level1, level2), true));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..1ac4038
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,100 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+ public static final String NAME = "legacyhierarchical";
+ public static final int CUR_VERSION = 1;
+
+ AbstractConfiguration conf;
+ ZooKeeper zk;
+
+ @Override
+ public int getCurrentVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+ final ZooKeeper zk,
+ final int factoryVersion)
+ throws IOException {
+ if (CUR_VERSION != factoryVersion) {
+ throw new IOException("Incompatible layout version found : "
+ + factoryVersion);
+ }
+ this.conf = conf;
+ this.zk = zk;
+ return this;
+ }
+
+ @Override
+ public void uninitialize() throws IOException {
+ // since zookeeper instance is passed from outside
+ // we don't need to close it here
+ }
+
+ @Override
+ public LedgerIdGenerator newLedgerIdGenerator() {
+ List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+ }
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new HierarchicalLedgerManager(conf, zk);
+ }
+
+ @Override
+ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+ throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+ return new ZkLedgerUnderreplicationManager(conf, zk);
+ }
+
+ @Override
+ public void format(AbstractConfiguration conf, ZooKeeper zk)
+ throws InterruptedException, KeeperException, IOException {
+ String ledgersRootPath = conf.getZkLedgersRootPath();
+ List<String> children = zk.getChildren(ledgersRootPath, false);
+ for (String child : children) {
+ if (HierarchicalLedgerManager.isSpecialZnode(child)) {
+ continue;
+ }
+ ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+ }
+ // Delete and recreate the LAYOUT information.
+ super.format(conf, zk);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index 990297f..f8f7546 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
+import java.util.TreeSet;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -37,33 +38,36 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes.
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 5-level hierarchical znodes.
*
* <p>
* LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
*
* <pre>
- * <level1 (3 digits)><level2 (4 digits)><level3 (4 digits)><level4 (4 digits)>
- * <level5 (4 digits)>
+ * <level0 (3 digits)><level1 (4 digits)><level2 (4 digits)><level3 (4 digits)>
+ * <level4 (4 digits)>
* </pre>
*
* These 5 parts are used to form the actual ledger node path used to store ledger metadata:
*
* <pre>
- * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * (ledgersRootPath) / level0 / level1 / level2 / level3 / L(level4)
* </pre>
*
* E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>,
* which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers,
* which avoids errors during garbage collection due to lists of children that are too long.
+ *
*/
-class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+class LongHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
+ static final String IDGEN_ZNODE = "idgen-long";
private static final String MAX_ID_SUFFIX = "9999";
private static final String MIN_ID_SUFFIX = "0000";
+
/**
* Constructor
*
@@ -77,11 +81,6 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
@Override
- public String getLedgerPath(long ledgerId) {
- return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
- }
-
- @Override
public long getLedgerId(String pathName) throws IOException {
if (!pathName.startsWith(ledgerRootPath)) {
throw new IOException("it is not a valid hashed path name : " + pathName);
@@ -89,54 +88,68 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
}
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+ }
//
// Active Ledger Manager
//
/**
- * Get the smallest cache id in a specified node /level1/level2/level3/level4
+ * Get the smallest cache id in a specified node /level0/level1/level2/level3
*
- * @param level1
+ * @param level0
* 1st level node name
- * @param level2
+ * @param level1
* 2nd level node name
- * @param level3
+ * @param level2
* 3rd level node name
- * @param level4
+ * @param level3
* 4th level node name
* @return the smallest ledger id
*/
- private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4)
+ private long getStartLedgerIdByLevel(String level0, String level1, String level2, String level3)
throws IOException {
- return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+ return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
}
/**
- * Get the largest cache id in a specified node /level1/level2/level3/level4
+ * Get the largest cache id in a specified node /level0/level1/level2/level3
*
- * @param level1
+ * @param level0
* 1st level node name
- * @param level2
+ * @param level1
* 2nd level node name
- * @param level3
+ * @param level2
* 3rd level node name
- * @param level4
+ * @param level3
* 4th level node name
* @return the largest ledger id
*/
- private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException {
- return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+ private long getEndLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException {
+ return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
}
@Override
public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
final Object context, final int successRc, final int failureRc) {
+
+ // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
asyncProcessLevelNodes(ledgerRootPath,
new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context,
successRc, failureRc);
}
+ protected static boolean isSpecialZnode(String znode) {
+ // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4)
+ // are at least 3 characters long. This prevents picking up any old-style
+ // hierarchical paths (2-4-4)
+ return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3;
+ }
+
private class RecursiveProcessor implements Processor<String> {
private final int level;
private final String path;
@@ -167,7 +180,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
context, successRc, failureRc);
} else {
// process each ledger after all ledger are processed, cb will be call to continue processing next
- // level5 node
+ // level4 node
asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc);
}
}
@@ -194,7 +207,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
}
- private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
+ synchronized private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
List<String> levelNodes = zk.getChildren(path, null);
Collections.sort(levelNodes);
if (level == 0) {
@@ -217,6 +230,9 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
String curLNode = curLevelNodes.get(level);
if (curLNode != null) {
+ // Traverse down through levels 0-3
+ // The nextRange becomes a listing of the children
+ // in the level4 directory.
if (level != 3) {
String nextLevelPath = path + "/" + curLNode;
initialize(nextLevelPath, level + 1);
@@ -229,7 +245,13 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
}
- private boolean moveToNext(int level) throws KeeperException, InterruptedException {
+ private void clearHigherLevels(int level) {
+ for(int i = level+1; i < 4; i++) {
+ curLevelNodes.set(i, null);
+ }
+ }
+
+ synchronized private boolean moveToNext(int level) throws KeeperException, InterruptedException {
Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
boolean movedToNextNode = false;
if (level == 0) {
@@ -239,6 +261,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
continue;
} else {
curLevelNodes.set(level, nextNode);
+ clearHigherLevels(level);
movedToNextNode = true;
break;
}
@@ -247,6 +270,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
if (curLevelNodesIter.hasNext()) {
String nextNode = curLevelNodesIter.next();
curLevelNodes.set(level, nextNode);
+ clearHigherLevels(level);
movedToNextNode = true;
} else {
movedToNextNode = moveToNext(level - 1);
@@ -261,6 +285,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
levelNodesIter.set(level, newCurLevelNodesIter);
if (newCurLevelNodesIter.hasNext()) {
curLevelNodes.set(level, newCurLevelNodesIter.next());
+ clearHigherLevels(level);
movedToNextNode = true;
}
}
@@ -306,15 +331,15 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
return r;
}
- LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
- String level1 = curLevelNodes.get(0);
- String level2 = curLevelNodes.get(1);
- String level3 = curLevelNodes.get(2);
- String level4 = curLevelNodes.get(3);
+ private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
+ String level0 = curLevelNodes.get(0);
+ String level1 = curLevelNodes.get(1);
+ String level2 = curLevelNodes.get(2);
+ String level3 = curLevelNodes.get(3);
StringBuilder nodeBuilder = new StringBuilder();
- nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
- .append(level3).append("/").append(level4);
+ nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
+ .append(level2).append("/").append(level3);
String nodePath = nodeBuilder.toString();
List<String> ledgerNodes = null;
try {
@@ -324,11 +349,11 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/"
- + level4 + " : " + zkActiveLedgers);
+ LOG.debug("All active ledgers from ZK for hash node " + level0 + "/" + level1 + "/" + level2 + "/"
+ + level3 + " : " + zkActiveLedgers);
}
- return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true,
- getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+ return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, level2, level3), true,
+ getEndLedgerIdByLevel(level0, level1, level2, level3), true));
}
}
}