You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/02 21:05:08 UTC
[2/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID
Generation
BOOKKEEPER-552: 64 Bits Ledger ID Generation
This PR supersedes #112
Instead of moving LongHierarchicalLedgerManager to HierarchicalLedgerManager, LongHierarchicalLedgerManager is still a stand-alone manager. HierarchicalLedgerManager has been moved to LegacyHierarchicalLedgerManager, and a new HierarchicalLedgerManager class has been put in its place, which is backwards-compatible with the original (now legacy) HierarchicalLedgerManager.
This new HierarchicalLedgerManager leverages the new LongZkLedgerIdGenerator to generate Ids, and uses the LongHierarchicalLedgerManager to manage metadata for ledger IDs > 31 bits long. For shorter ledger ids, the LegacyHierarchicalLedgerManager is used, to keep backwards-compatibility.
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>
Closes #114 from knusbaum/BOOKKEEPER-552
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/057af8db
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/057af8db
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/057af8db
Branch: refs/heads/master
Commit: 057af8dbce6c08794eb8b46ca52ca13f222d9bbb
Parents: 9c79e07
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue May 2 14:04:57 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue May 2 14:04:57 2017 -0700
----------------------------------------------------------------------
.../apache/bookkeeper/client/BKException.java | 12 +
.../bookkeeper/conf/AbstractConfiguration.java | 3 +
.../meta/AbstractHierarchicalLedgerManager.java | 213 +++++++++++
.../meta/AbstractZkLedgerManager.java | 3 +-
.../bookkeeper/meta/FlatLedgerManager.java | 6 +-
.../meta/HierarchicalLedgerManager.java | 375 +++----------------
.../meta/HierarchicalLedgerManagerFactory.java | 80 +---
.../bookkeeper/meta/LedgerManagerFactory.java | 5 +-
.../meta/LegacyHierarchicalLedgerManager.java | 281 ++++++++++++++
.../LegacyHierarchicalLedgerManagerFactory.java | 100 +++++
.../meta/LongHierarchicalLedgerManager.java | 101 +++--
.../meta/LongZkLedgerIdGenerator.java | 333 ++++++++++++++++
.../bookkeeper/meta/ZkLedgerIdGenerator.java | 27 +-
.../org/apache/bookkeeper/util/StringUtils.java | 11 +-
.../bookkeeper/client/BookieRecoveryTest.java | 6 +-
.../meta/TestLongZkLedgerIdGenerator.java | 145 +++++++
.../MultiLedgerManagerMultiDigestTestCase.java | 1 +
.../test/MultiLedgerManagerTestCase.java | 1 +
.../bookkeeper/test/TestBackwardCompat.java | 53 +++
19 files changed, 1310 insertions(+), 446 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 2377c1c..aa3ec08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -102,6 +102,8 @@ public abstract class BKException extends Exception {
return new BKDuplicateEntryIdException();
case Code.TimeoutException:
return new BKTimeoutException();
+ case Code.LedgerIdOverflowException:
+ return new BKLedgerIdOverflowException();
default:
return new BKUnexpectedConditionException();
}
@@ -142,6 +144,8 @@ public abstract class BKException extends Exception {
int UnauthorizedAccessException = -102;
int UnclosedFragmentException = -103;
int WriteOnReadOnlyBookieException = -104;
+ //-105 reserved for TooManyRequestsException
+ int LedgerIdOverflowException = -106;
// generic exception code used to propagate in replication pipeline
int ReplicationException = -200;
@@ -210,6 +214,8 @@ public abstract class BKException extends Exception {
return "Attempting to use an unclosed fragment; This is not safe";
case Code.WriteOnReadOnlyBookieException:
return "Attempting to write on ReadOnly bookie";
+ case Code.LedgerIdOverflowException:
+ return "Next ledgerID is too large.";
case Code.ReplicationException:
return "Errors in replication pipeline";
case Code.ClientClosedException:
@@ -404,4 +410,10 @@ public abstract class BKException extends Exception {
super(Code.TimeoutException);
}
}
+
+ public static class BKLedgerIdOverflowException extends BKException {
+ public BKLedgerIdOverflowException() {
+ super(Code.LedgerIdOverflowException);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 07e5d08..c7c50cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -76,6 +76,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
// Zookeeper ACL settings
protected final static String ZK_ENABLE_SECURITY = "zkEnableSecurity";
+ // Kluge for compatibility testing. Never set this outside tests.
+ public final static String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck";
+
protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
new file mode 100644
index 0000000..02359e0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHierarchicalLedgerManager extends AbstractZkLedgerManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractHierarchicalLedgerManager.class);
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ */
+ public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
+ }
+
+ /**
+ * Process hash nodes in a given path
+ */
+ void asyncProcessLevelNodes(
+ final String path, final Processor<String> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ zk.sync(path, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error syncing path " + path + " when getting its chidren: ",
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+
+ zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> levelNodes) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("Error polling hash nodes of " + path,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ AsyncListProcessor<String> listProcessor =
+ new AsyncListProcessor<String>(scheduler);
+ // process its children
+ listProcessor.process(levelNodes, processor, finalCb,
+ context, successRc, failureRc);
+ }
+ }, null);
+ }
+ }, null);
+ }
+
+ /**
+ * Process list one by one in asynchronize way. Process will be stopped immediately
+ * when error occurred.
+ */
+ private static class AsyncListProcessor<T> {
+ // use this to prevent long stack chains from building up in callbacks
+ ScheduledExecutorService scheduler;
+
+ /**
+ * Constructor
+ *
+ * @param scheduler
+ * Executor used to prevent long stack chains
+ */
+ public AsyncListProcessor(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Process list of items
+ *
+ * @param data
+ * List of data to process
+ * @param processor
+ * Callback to process element of list when success
+ * @param finalCb
+ * Final callback to be called after all elements in the list are processed
+ * @param context
+ * Context of final callback
+ * @param successRc
+ * RC passed to final callback on success
+ * @param failureRc
+ * RC passed to final callback on failure
+ */
+ public void process(final List<T> data, final Processor<T> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ if (data == null || data.size() == 0) {
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ final int size = data.size();
+ final AtomicInteger current = new AtomicInteger(0);
+ AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != successRc) {
+ // terminal immediately
+ finalCb.processResult(failureRc, null, context);
+ return;
+ }
+ // process next element
+ int next = current.incrementAndGet();
+ if (next >= size) { // reach the end of list
+ finalCb.processResult(successRc, null, context);
+ return;
+ }
+ final T dataToProcess = data.get(next);
+ final AsyncCallback.VoidCallback stub = this;
+ scheduler.submit(new Runnable() {
+ @Override
+ public final void run() {
+ processor.process(dataToProcess, stub);
+ }
+ });
+ }
+ };
+ T firstElement = data.get(0);
+ processor.process(firstElement, stubCallback);
+ }
+ }
+
+ // get ledger from all level nodes
+ long getLedgerId(String...levelNodes) throws IOException {
+ return StringUtils.stringToHierarchicalLedgerId(levelNodes);
+ }
+
+ /**
+ * Get all ledger ids in the given zk path.
+ *
+ * @param ledgerNodes
+ * List of ledgers in the given path
+ * example:- {L1652, L1653, L1650}
+ * @param path
+ * The zookeeper path of the ledger ids. The path should start with {@ledgerRootPath}
+ * example (with ledgerRootPath = /ledgers):- /ledgers/00/0053
+ */
+ @Override
+ protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+ NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>();
+
+ if (!path.startsWith(ledgerRootPath)) {
+ LOG.warn("Ledger path [{}] is not a valid path name, it should start wth {}", path, ledgerRootPath);
+ return zkActiveLedgers;
+ }
+
+ long ledgerIdPrefix = 0;
+ char ch;
+ for (int i = ledgerRootPath.length() + 1; i < path.length(); i++) {
+ ch = path.charAt(i);
+ if (ch < '0' || ch > '9')
+ continue;
+ ledgerIdPrefix = ledgerIdPrefix * 10 + (ch - '0');
+ }
+
+ for (String ledgerNode : ledgerNodes) {
+ if (isSpecialZnode(ledgerNode)) {
+ continue;
+ }
+ long ledgerId = ledgerIdPrefix;
+ for (int i = 0; i < ledgerNode.length(); i++) {
+ ch = ledgerNode.charAt(i);
+ if (ch < '0' || ch > '9')
+ continue;
+ ledgerId = ledgerId * 10 + (ch - '0');
+ }
+ zkActiveLedgers.add(ledgerId);
+ }
+ return zkActiveLedgers;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 6db3375..f5a60f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
@@ -468,7 +469,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
* Znode Name
* @return true if the znode is a special znode otherwise false
*/
- protected boolean isSpecialZnode(String znode) {
+ protected static boolean isSpecialZnode(String znode) {
if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
|| BookKeeperConstants.COOKIE_NODE.equals(znode)
|| BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 3172247..36db62a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -87,9 +87,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
}
- @Override
- protected boolean isSpecialZnode(String znode) {
- return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+
+ protected static boolean isSpecialZnode(String znode) {
+ return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || AbstractZkLedgerManager.isSpecialZnode(znode);
}
@Override
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index bed1627..309762b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -18,370 +18,105 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.ZooKeeper;
/**
- * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ * HierarchicalLedgerManager makes use of both LongHierarchicalLedgerManager and LegacyHierarchicalLedgerManager
+ * to extend the 31-bit ledger id range of the LegacyHierarchicalLedgerManager to that of the LongHierarchicalLedgerManager
+ * while remaining backwards-compatible with the legacy manager.
+ *
+ * In order to achieve backwards-compatibility, the HierarchicalLedgerManager forwards requests relating to ledger IDs which
+ * are < Integer.MAX_INT to the LegacyHierarchicalLedgerManager. The new 5-part directory structure will not appear until a
+ * ledger with an ID >= Integer.MAX_INT is created.
*
- * <p>
- * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
- * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre>
- * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
- * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
- * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
- * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * errors during garbage collection due to lists of children that are too long.
+ * @see LongHierarchicalLedgerManager
+ * @see LegacyHierarchicalLedgerManager
*/
-class HierarchicalLedgerManager extends AbstractZkLedgerManager {
-
+class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
- static final String IDGEN_ZNODE = "idgen";
- private static final String MAX_ID_SUFFIX = "9999";
- private static final String MIN_ID_SUFFIX = "0000";
+ LegacyHierarchicalLedgerManager legacyLM;
+ LongHierarchicalLedgerManager longLM;
- /**
- * Constructor
- *
- * @param conf
- * Configuration object
- * @param zk
- * ZooKeeper Client Handle
- */
public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
super(conf, zk);
+ legacyLM = new LegacyHierarchicalLedgerManager(conf, zk);
+ longLM = new LongHierarchicalLedgerManager (conf, zk);
}
@Override
- public String getLedgerPath(long ledgerId) {
- return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
- }
-
- @Override
- public long getLedgerId(String pathName) throws IOException {
- if (!pathName.startsWith(ledgerRootPath)) {
- throw new IOException("it is not a valid hashed path name : " + pathName);
- }
- String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
- return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
- }
-
- // get ledger from all level nodes
- long getLedgerId(String...levelNodes) throws IOException {
- return StringUtils.stringToHierarchicalLedgerId(levelNodes);
- }
-
- //
- // Active Ledger Manager
- //
-
- /**
- * Get the smallest cache id in a specified node /level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @return the smallest ledger id
- */
- private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
- return getLedgerId(level1, level2, MIN_ID_SUFFIX);
- }
-
- /**
- * Get the largest cache id in a specified node /level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @return the largest ledger id
- */
- private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
- return getLedgerId(level1, level2, MAX_ID_SUFFIX);
- }
-
- @Override
- public void asyncProcessLedgers(final Processor<Long> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- // process 1st level nodes
- asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
- @Override
- public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
- if (isSpecialZnode(l1Node)) {
- cb1.processResult(successRc, null, context);
- return;
- }
- final String l1NodePath = ledgerRootPath + "/" + l1Node;
- // process level1 path, after all children of level1 process
- // it callback to continue processing next level1 node
- asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
- @Override
- public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
- // process level1/level2 path
- String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
- // process each ledger
- // after all ledger are processed, cb2 will be call to continue processing next level2 node
- asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
- context, successRc, failureRc);
- }
- }, cb1, context, successRc, failureRc);
- }
- }, finalCb, context, successRc, failureRc);
- }
+ public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc,
+ int failureRc) {
+ // Process the old 31-bit id ledgers first.
+ legacyLM.asyncProcessLedgers(processor, new VoidCallback(){
- /**
- * Process hash nodes in a given path
- */
- void asyncProcessLevelNodes(
- final String path, final Processor<String> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- zk.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Error syncing path " + path + " when getting its chidren: ",
- KeeperException.create(KeeperException.Code.get(rc), path));
- finalCb.processResult(failureRc, null, context);
- return;
+ if(rc == failureRc) {
+ // If it fails, return the failure code to the callback
+ finalCb.processResult(rc, path, ctx);
+ }
+ else {
+ // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
+ longLM.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
}
-
- zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx,
- List<String> levelNodes) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Error polling hash nodes of " + path,
- KeeperException.create(KeeperException.Code.get(rc), path));
- finalCb.processResult(failureRc, null, context);
- return;
- }
- AsyncListProcessor<String> listProcessor =
- new AsyncListProcessor<String>(scheduler);
- // process its children
- listProcessor.process(levelNodes, processor, finalCb,
- context, successRc, failureRc);
- }
- }, null);
}
- }, null);
- }
-
- /**
- * Process list one by one in asynchronize way. Process will be stopped immediately
- * when error occurred.
- */
- private static class AsyncListProcessor<T> {
- // use this to prevent long stack chains from building up in callbacks
- ScheduledExecutorService scheduler;
- /**
- * Constructor
- *
- * @param scheduler
- * Executor used to prevent long stack chains
- */
- public AsyncListProcessor(ScheduledExecutorService scheduler) {
- this.scheduler = scheduler;
- }
+ }, context, successRc, failureRc);
+ }
- /**
- * Process list of items
- *
- * @param data
- * List of data to process
- * @param processor
- * Callback to process element of list when success
- * @param finalCb
- * Final callback to be called after all elements in the list are processed
- * @param context
- * Context of final callback
- * @param successRc
- * RC passed to final callback on success
- * @param failureRc
- * RC passed to final callback on failure
- */
- public void process(final List<T> data, final Processor<T> processor,
- final AsyncCallback.VoidCallback finalCb, final Object context,
- final int successRc, final int failureRc) {
- if (data == null || data.size() == 0) {
- finalCb.processResult(successRc, null, context);
- return;
- }
- final int size = data.size();
- final AtomicInteger current = new AtomicInteger(0);
- AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != successRc) {
- // terminal immediately
- finalCb.processResult(failureRc, null, context);
- return;
- }
- // process next element
- int next = current.incrementAndGet();
- if (next >= size) { // reach the end of list
- finalCb.processResult(successRc, null, context);
- return;
- }
- final T dataToProcess = data.get(next);
- final AsyncCallback.VoidCallback stub = this;
- scheduler.submit(new Runnable() {
- @Override
- public final void run() {
- processor.process(dataToProcess, stub);
- }
- });
- }
- };
- T firstElement = data.get(0);
- processor.process(firstElement, stubCallback);
- }
+ @Override
+ protected String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
}
@Override
- protected boolean isSpecialZnode(String znode) {
- return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
+ protected long getLedgerId(String ledgerPath) throws IOException {
+ // TODO Auto-generated method stub
+ if (!ledgerPath.startsWith(ledgerRootPath)) {
+ throw new IOException("it is not a valid hashed path name : " + ledgerPath);
+ }
+ String hierarchicalPath = ledgerPath.substring(ledgerRootPath.length() + 1);
+ return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
}
@Override
public LedgerRangeIterator getLedgerRanges() {
- return new HierarchicalLedgerRangeIterator();
+ LedgerRangeIterator legacyLedgerRangeIterator = legacyLM.getLedgerRanges();
+ LedgerRangeIterator longLedgerRangeIterator = longLM.getLedgerRanges();
+ return new HierarchicalLedgerRangeIterator(legacyLedgerRangeIterator, longLedgerRangeIterator);
}
- /**
- * Iterator through each metadata bucket with hierarchical mode
- */
- private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
- private Iterator<String> l1NodesIter = null;
- private Iterator<String> l2NodesIter = null;
- private String curL1Nodes = "";
- private boolean iteratorDone = false;
- private LedgerRange nextRange = null;
+ private static class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
- /**
- * iterate next level1 znode
- *
- * @return false if have visited all level1 nodes
- * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
- */
- private boolean nextL1Node() throws KeeperException, InterruptedException {
- l2NodesIter = null;
- while (l2NodesIter == null) {
- if (l1NodesIter.hasNext()) {
- curL1Nodes = l1NodesIter.next();
- } else {
- return false;
- }
- if (isSpecialZnode(curL1Nodes)) {
- continue;
- }
- List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
- Collections.sort(l2Nodes);
- l2NodesIter = l2Nodes.iterator();
- if (!l2NodesIter.hasNext()) {
- l2NodesIter = null;
- continue;
- }
- }
- return true;
- }
+ LedgerRangeIterator legacyLedgerRangeIterator;
+ LedgerRangeIterator longLedgerRangeIterator;
- synchronized private void preload() throws IOException {
- while (nextRange == null && !iteratorDone) {
- boolean hasMoreElements = false;
- try {
- if (l1NodesIter == null) {
- List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
- Collections.sort(l1Nodes);
- l1NodesIter = l1Nodes.iterator();
- hasMoreElements = nextL1Node();
- } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
- hasMoreElements = nextL1Node();
- } else {
- hasMoreElements = true;
- }
- } catch (KeeperException ke) {
- throw new IOException("Error preloading next range", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while preloading", ie);
- }
- if (hasMoreElements) {
- nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
- if (nextRange.size() == 0) {
- nextRange = null;
- }
- } else {
- iteratorDone = true;
- }
- }
+ HierarchicalLedgerRangeIterator(LedgerRangeIterator legacyLedgerRangeIterator, LedgerRangeIterator longLedgerRangeIterator) {
+ this.legacyLedgerRangeIterator = legacyLedgerRangeIterator;
+ this.longLedgerRangeIterator = longLedgerRangeIterator;
}
@Override
- synchronized public boolean hasNext() throws IOException {
- preload();
- return nextRange != null && !iteratorDone;
+ public boolean hasNext() throws IOException {
+ return legacyLedgerRangeIterator.hasNext() || longLedgerRangeIterator.hasNext();
}
@Override
- synchronized public LedgerRange next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public LedgerRange next() throws IOException {
+ if(legacyLedgerRangeIterator.hasNext()) {
+ return legacyLedgerRangeIterator.next();
}
- LedgerRange r = nextRange;
- nextRange = null;
- return r;
+ return longLedgerRangeIterator.next();
}
- /**
- * Get a single node level1/level2
- *
- * @param level1
- * 1st level node name
- * @param level2
- * 2nd level node name
- * @throws IOException
- */
- LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
- throws IOException {
- StringBuilder nodeBuilder = new StringBuilder();
- nodeBuilder.append(ledgerRootPath).append("/")
- .append(level1).append("/").append(level2);
- String nodePath = nodeBuilder.toString();
- List<String> ledgerNodes = null;
- try {
- ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
- } catch (InterruptedException e) {
- throw new IOException("Error when get child nodes from zk", e);
- }
- NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
- if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK for hash node "
- + level1 + "/" + level2 + " : " + zkActiveLedgers);
- }
-
- return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
- getEndLedgerIdByLevel(level1, level2), true));
- }
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index 084d73d..a74a633 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,89 +15,25 @@ package org.apache.bookkeeper.meta;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.meta;
-import java.io.IOException;
import java.util.List;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.bookkeeper.replication.ReplicationException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
/**
- * Hierarchical Ledger Manager Factory
+ * Legacy Hierarchical Ledger Manager Factory
*/
-public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+public class HierarchicalLedgerManagerFactory extends LegacyHierarchicalLedgerManagerFactory {
public static final String NAME = "hierarchical";
- public static final int CUR_VERSION = 1;
-
- AbstractConfiguration conf;
- ZooKeeper zk;
-
- @Override
- public int getCurrentVersion() {
- return CUR_VERSION;
- }
-
- @Override
- public LedgerManagerFactory initialize(final AbstractConfiguration conf,
- final ZooKeeper zk,
- final int factoryVersion)
- throws IOException {
- if (CUR_VERSION != factoryVersion) {
- throw new IOException("Incompatible layout version found : "
- + factoryVersion);
- }
- this.conf = conf;
- this.zk = zk;
- return this;
- }
-
- @Override
- public void uninitialize() throws IOException {
- // since zookeeper instance is passed from outside
- // we don't need to close it here
- }
-
+
@Override
public LedgerIdGenerator newLedgerIdGenerator() {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
- return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
- }
-
- @Override
- public LedgerManager newLedgerManager() {
- return new HierarchicalLedgerManager(conf, zk);
- }
-
- @Override
- public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
- return new ZkLedgerUnderreplicationManager(conf, zk);
- }
-
- @Override
- public void format(AbstractConfiguration conf, ZooKeeper zk)
- throws InterruptedException, KeeperException, IOException {
- HierarchicalLedgerManager ledgerManager = (HierarchicalLedgerManager) newLedgerManager();
- try {
- String ledgersRootPath = conf.getZkLedgersRootPath();
- List<String> children = zk.getChildren(ledgersRootPath, false);
- for (String child : children) {
- if (ledgerManager.isSpecialZnode(child)) {
- continue;
- }
- ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
- }
- } finally {
- ledgerManager.close();
- }
- // Delete and recreate the LAYOUT information.
- super.format(conf, zk);
+ ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+ return new LongZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LongHierarchicalLedgerManager.IDGEN_ZNODE, subIdGenerator, zkAcls);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 76d1572..e7cfc4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -164,7 +164,8 @@ public abstract class LedgerManagerFactory {
// handle V2 layout case
if (factoryClass != null &&
- !layout.getManagerFactoryClass().equals(factoryClass.getName())) {
+ !layout.getManagerFactoryClass().equals(factoryClass.getName()) &&
+ conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) { // Disable should ONLY happen during compatibility testing.
throw new IOException("Configured layout " + factoryClass.getName()
+ " does not match existing layout " + layout.getManagerFactoryClass());
@@ -210,6 +211,8 @@ public abstract class LedgerManagerFactory {
factoryClass = FlatLedgerManagerFactory.class;
} else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
factoryClass = HierarchicalLedgerManagerFactory.class;
+ } else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
+ factoryClass = LongHierarchicalLedgerManagerFactory.class;
} else {
throw new IOException("Unknown ledger manager type: "
+ lmType);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
new file mode 100644
index 0000000..8be23b2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ *
+ * <p>
+ * LegacyHierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre>
+ * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
+ * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
+ * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
+ * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
+ * errors during garbage collection due to lists of children that are too long.
+ */
+class LegacyHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
+
+ static final Logger LOG = LoggerFactory.getLogger(LegacyHierarchicalLedgerManager.class);
+
+ static final String IDGEN_ZNODE = "idgen";
+ private static final String MAX_ID_SUFFIX = "9999";
+ private static final String MIN_ID_SUFFIX = "0000";
+
+ private static final ThreadLocal<StringBuilder> threadLocalNodeBuilder = new ThreadLocal<StringBuilder>() {
+ @Override
+ protected StringBuilder initialValue() {
+ return new StringBuilder();
+ }
+ };
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ */
+ public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
+ }
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getShortHierarchicalLedgerPath(ledgerId);
+ }
+
+ @Override
+ public long getLedgerId(String pathName) throws IOException {
+ if (!pathName.startsWith(ledgerRootPath)) {
+ throw new IOException("it is not a valid hashed path name : " + pathName);
+ }
+ String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+ return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
+ }
+
+ //
+ // Active Ledger Manager
+ //
+
+ /**
+ * Get the smallest cache id in a specified node /level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @return the smallest ledger id
+ */
+ private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
+ return getLedgerId(level1, level2, MIN_ID_SUFFIX);
+ }
+
+ /**
+ * Get the largest cache id in a specified node /level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @return the largest ledger id
+ */
+ private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
+ return getLedgerId(level1, level2, MAX_ID_SUFFIX);
+ }
+
+ @Override
+ public void asyncProcessLedgers(final Processor<Long> processor,
+ final AsyncCallback.VoidCallback finalCb, final Object context,
+ final int successRc, final int failureRc) {
+ // process 1st level nodes
+ asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
+ @Override
+ public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
+ if (isSpecialZnode(l1Node)) {
+ cb1.processResult(successRc, null, context);
+ return;
+ }
+ final String l1NodePath = ledgerRootPath + "/" + l1Node;
+ // process level1 path, after all children of level1 process
+ // it callback to continue processing next level1 node
+ asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
+ @Override
+ public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
+ // process level1/level2 path
+ String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
+ // process each ledger
+ // after all ledger are processed, cb2 will be call to continue processing next level2 node
+ asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
+ context, successRc, failureRc);
+ }
+ }, cb1, context, successRc, failureRc);
+ }
+ }, finalCb, context, successRc, failureRc);
+ }
+
+ protected static boolean isSpecialZnode(String znode) {
+ return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode) || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+ }
+
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return new HierarchicalLedgerRangeIterator();
+ }
+
+ /**
+ * Iterator through each metadata bucket with hierarchical mode
+ */
+ private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+ private Iterator<String> l1NodesIter = null;
+ private Iterator<String> l2NodesIter = null;
+ private String curL1Nodes = "";
+ private boolean iteratorDone = false;
+ private LedgerRange nextRange = null;
+
+ /**
+ * iterate next level1 znode
+ *
+ * @return false if have visited all level1 nodes
+ * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+ */
+ private boolean nextL1Node() throws KeeperException, InterruptedException {
+ l2NodesIter = null;
+ while (l2NodesIter == null) {
+ if (l1NodesIter.hasNext()) {
+ curL1Nodes = l1NodesIter.next();
+ } else {
+ return false;
+ }
+ // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes)
+ if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+ continue;
+ }
+ List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+ Collections.sort(l2Nodes);
+ l2NodesIter = l2Nodes.iterator();
+ if (!l2NodesIter.hasNext()) {
+ l2NodesIter = null;
+ continue;
+ }
+ }
+ return true;
+ }
+
+ synchronized private void preload() throws IOException {
+ while (nextRange == null && !iteratorDone) {
+ boolean hasMoreElements = false;
+ try {
+ if (l1NodesIter == null) {
+ List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
+ Collections.sort(l1Nodes);
+ l1NodesIter = l1Nodes.iterator();
+ hasMoreElements = nextL1Node();
+ } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
+ hasMoreElements = nextL1Node();
+ } else {
+ hasMoreElements = true;
+ }
+ } catch (KeeperException ke) {
+ throw new IOException("Error preloading next range", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while preloading", ie);
+ }
+ if (hasMoreElements) {
+ nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+ if (nextRange.size() == 0) {
+ nextRange = null;
+ }
+ } else {
+ iteratorDone = true;
+ }
+ }
+ }
+
+ @Override
+ synchronized public boolean hasNext() throws IOException {
+ preload();
+ return nextRange != null && !iteratorDone;
+ }
+
+ @Override
+ synchronized public LedgerRange next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ LedgerRange r = nextRange;
+ nextRange = null;
+ return r;
+ }
+
+ /**
+ * Get a single node level1/level2
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @throws IOException
+ */
+ LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+ throws IOException {
+ StringBuilder nodeBuilder = threadLocalNodeBuilder.get();
+ nodeBuilder.setLength(0);
+ nodeBuilder.append(ledgerRootPath).append("/")
+ .append(level1).append("/").append(level2);
+ String nodePath = nodeBuilder.toString();
+ List<String> ledgerNodes = null;
+ try {
+ ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+ } catch (InterruptedException e) {
+ throw new IOException("Error when get child nodes from zk", e);
+ }
+ NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK for hash node "
+ + level1 + "/" + level2 + " : " + zkActiveLedgers);
+ }
+
+ return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
+ getEndLedgerIdByLevel(level1, level2), true));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..1ac4038
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,100 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+ public static final String NAME = "legacyhierarchical";
+ public static final int CUR_VERSION = 1;
+
+ AbstractConfiguration conf;
+ ZooKeeper zk;
+
+ @Override
+ public int getCurrentVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+ final ZooKeeper zk,
+ final int factoryVersion)
+ throws IOException {
+ if (CUR_VERSION != factoryVersion) {
+ throw new IOException("Incompatible layout version found : "
+ + factoryVersion);
+ }
+ this.conf = conf;
+ this.zk = zk;
+ return this;
+ }
+
+ @Override
+ public void uninitialize() throws IOException {
+ // since zookeeper instance is passed from outside
+ // we don't need to close it here
+ }
+
+ @Override
+ public LedgerIdGenerator newLedgerIdGenerator() {
+ List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+ }
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new HierarchicalLedgerManager(conf, zk);
+ }
+
+ @Override
+ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+ throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+ return new ZkLedgerUnderreplicationManager(conf, zk);
+ }
+
+ @Override
+ public void format(AbstractConfiguration conf, ZooKeeper zk)
+ throws InterruptedException, KeeperException, IOException {
+ String ledgersRootPath = conf.getZkLedgersRootPath();
+ List<String> children = zk.getChildren(ledgersRootPath, false);
+ for (String child : children) {
+ if (HierarchicalLedgerManager.isSpecialZnode(child)) {
+ continue;
+ }
+ ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+ }
+ // Delete and recreate the LAYOUT information.
+ super.format(conf, zk);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index 990297f..f8f7546 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
+import java.util.TreeSet;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -37,33 +38,36 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes.
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 5-level hierarchical znodes.
*
* <p>
* LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
*
* <pre>
- * <level1 (3 digits)><level2 (4 digits)><level3 (4 digits)><level4 (4 digits)>
- * <level5 (4 digits)>
+ * <level0 (3 digits)><level1 (4 digits)><level2 (4 digits)><level3 (4 digits)>
+ * <level4 (4 digits)>
* </pre>
*
* These 5 parts are used to form the actual ledger node path used to store ledger metadata:
*
* <pre>
- * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * (ledgersRootPath) / level0 / level1 / level2 / level3 / L(level4)
* </pre>
*
* E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>,
* which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers,
* which avoids errors during garbage collection due to lists of children that are too long.
+ *
*/
-class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+class LongHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
+ static final String IDGEN_ZNODE = "idgen-long";
private static final String MAX_ID_SUFFIX = "9999";
private static final String MIN_ID_SUFFIX = "0000";
+
/**
* Constructor
*
@@ -77,11 +81,6 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
@Override
- public String getLedgerPath(long ledgerId) {
- return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
- }
-
- @Override
public long getLedgerId(String pathName) throws IOException {
if (!pathName.startsWith(ledgerRootPath)) {
throw new IOException("it is not a valid hashed path name : " + pathName);
@@ -89,54 +88,68 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
}
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+ }
//
// Active Ledger Manager
//
/**
- * Get the smallest cache id in a specified node /level1/level2/level3/level4
+ * Get the smallest cache id in a specified node /level0/level1/level2/level3
*
- * @param level1
+ * @param level0
* 1st level node name
- * @param level2
+ * @param level1
* 2nd level node name
- * @param level3
+ * @param level2
* 3rd level node name
- * @param level4
+ * @param level3
* 4th level node name
* @return the smallest ledger id
*/
- private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4)
+ private long getStartLedgerIdByLevel(String level0, String level1, String level2, String level3)
throws IOException {
- return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+ return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
}
/**
- * Get the largest cache id in a specified node /level1/level2/level3/level4
+ * Get the largest cache id in a specified node /level0/level1/level2/level3
*
- * @param level1
+ * @param level0
* 1st level node name
- * @param level2
+ * @param level1
* 2nd level node name
- * @param level3
+ * @param level2
* 3rd level node name
- * @param level4
+ * @param level3
* 4th level node name
* @return the largest ledger id
*/
- private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException {
- return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+ private long getEndLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException {
+ return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
}
@Override
public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
final Object context, final int successRc, final int failureRc) {
+
+ // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
asyncProcessLevelNodes(ledgerRootPath,
new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context,
successRc, failureRc);
}
+ protected static boolean isSpecialZnode(String znode) {
+ // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4)
+ // are at least 3 characters long. This prevents picking up any old-style
+ // hierarchical paths (2-4-4)
+ return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3;
+ }
+
private class RecursiveProcessor implements Processor<String> {
private final int level;
private final String path;
@@ -167,7 +180,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
context, successRc, failureRc);
} else {
// process each ledger after all ledger are processed, cb will be call to continue processing next
- // level5 node
+ // level4 node
asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc);
}
}
@@ -194,7 +207,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
}
- private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
+ synchronized private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
List<String> levelNodes = zk.getChildren(path, null);
Collections.sort(levelNodes);
if (level == 0) {
@@ -217,6 +230,9 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
String curLNode = curLevelNodes.get(level);
if (curLNode != null) {
+ // Traverse down through levels 0-3
+ // The nextRange becomes a listing of the children
+ // in the level4 directory.
if (level != 3) {
String nextLevelPath = path + "/" + curLNode;
initialize(nextLevelPath, level + 1);
@@ -229,7 +245,13 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
}
- private boolean moveToNext(int level) throws KeeperException, InterruptedException {
+ private void clearHigherLevels(int level) {
+ for(int i = level+1; i < 4; i++) {
+ curLevelNodes.set(i, null);
+ }
+ }
+
+ synchronized private boolean moveToNext(int level) throws KeeperException, InterruptedException {
Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
boolean movedToNextNode = false;
if (level == 0) {
@@ -239,6 +261,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
continue;
} else {
curLevelNodes.set(level, nextNode);
+ clearHigherLevels(level);
movedToNextNode = true;
break;
}
@@ -247,6 +270,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
if (curLevelNodesIter.hasNext()) {
String nextNode = curLevelNodesIter.next();
curLevelNodes.set(level, nextNode);
+ clearHigherLevels(level);
movedToNextNode = true;
} else {
movedToNextNode = moveToNext(level - 1);
@@ -261,6 +285,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
levelNodesIter.set(level, newCurLevelNodesIter);
if (newCurLevelNodesIter.hasNext()) {
curLevelNodes.set(level, newCurLevelNodesIter.next());
+ clearHigherLevels(level);
movedToNextNode = true;
}
}
@@ -306,15 +331,15 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
return r;
}
- LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
- String level1 = curLevelNodes.get(0);
- String level2 = curLevelNodes.get(1);
- String level3 = curLevelNodes.get(2);
- String level4 = curLevelNodes.get(3);
+ private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
+ String level0 = curLevelNodes.get(0);
+ String level1 = curLevelNodes.get(1);
+ String level2 = curLevelNodes.get(2);
+ String level3 = curLevelNodes.get(3);
StringBuilder nodeBuilder = new StringBuilder();
- nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
- .append(level3).append("/").append(level4);
+ nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
+ .append(level2).append("/").append(level3);
String nodePath = nodeBuilder.toString();
List<String> ledgerNodes = null;
try {
@@ -324,11 +349,11 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
}
NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
if (LOG.isDebugEnabled()) {
- LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/"
- + level4 + " : " + zkActiveLedgers);
+ LOG.debug("All active ledgers from ZK for hash node " + level0 + "/" + level1 + "/" + level2 + "/"
+ + level3 + " : " + zkActiveLedgers);
}
- return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true,
- getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+ return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, level2, level3), true,
+ getEndLedgerIdByLevel(level0, level1, level2, level3), true));
}
}
}