You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/01/31 01:42:02 UTC
bookkeeper git commit: BOOKKEEPER-873 and BOOKKEEPER-553
Repository: bookkeeper
Updated Branches:
refs/heads/master 13d668f22 -> 42e8f1294
BOOKKEEPER-873 and BOOKKEEPER-553
BOOKKEEPER-873: CreateLedgerAPI to accept ledgerId
Add ledgerCreateAdv with ledgerId interface to Bookkeeper
and corresponding Junit tests.
BOOKKEEPER-553: LongHierarchicalLedgerManager
- LongHierarchicalLedgerManager to support 63 bits ledgerid (positive long)
- LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4)
Author: Venkateswara <vj...@salesforce.com>
Author: Charan Reddy Guttapalem <cg...@salesforce.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #88 from reddycharan/ledgerhandleadvwithledgerid
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/42e8f129
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/42e8f129
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/42e8f129
Branch: refs/heads/master
Commit: 42e8f1294f206cbe51a6af669cf605833b78bf42
Parents: 13d668f
Author: Venkateswara <vj...@salesforce.com>
Authored: Mon Jan 30 17:41:54 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Jan 30 17:41:54 2017 -0800
----------------------------------------------------------------------
.../apache/bookkeeper/client/BookKeeper.java | 109 +++++-
.../bookkeeper/client/LedgerCreateOp.java | 26 +-
.../meta/HierarchicalLedgerManager.java | 4 +-
.../meta/LongHierarchicalLedgerManager.java | 334 +++++++++++++++++++
.../LongHierarchicalLedgerManagerFactory.java | 12 +
.../org/apache/bookkeeper/util/StringUtils.java | 49 +++
.../client/BookieWriteLedgerTest.java | 103 +++++-
.../client/TestWatchEnsembleChange.java | 4 +-
.../apache/bookkeeper/meta/GcLedgersTest.java | 2 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 3 +-
.../MultiLedgerManagerMultiDigestTestCase.java | 1 +
.../test/MultiLedgerManagerTestCase.java | 3 +-
12 files changed, 629 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 08c24b0..2f8a0b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -738,7 +738,114 @@ public class BookKeeper implements AutoCloseable {
return;
}
new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
- ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv();
+ ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv((long)(-1));
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Synchronously creates a new ledger using the interface which accepts a ledgerId as input.
+ * This method returns {@link LedgerHandleAdv} which can accept entryId.
+ * Parameters must match those of
+ * {@link #asyncCreateLedgerAdvWithLedgerId(byte[], long, int, int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ * @param ledgerId
+ * @param ensSize
+ * @param writeQuorumSize
+ * @param ackQuorumSize
+ * @param digestType
+ * @param passwd
+ * @param customMetadata
+ * @return a handle to the newly created ledger
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle createLedgerAdv(final long ledgerId,
+ int ensSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ DigestType digestType,
+ byte passwd[],
+ final Map<String, byte[]> customMetadata) throws InterruptedException, BKException{
+ CompletableFuture<LedgerHandle> counter = new CompletableFuture<>();
+
+ /*
+ * Calls asynchronous version
+ */
+ asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
+ new SyncCreateCallback(), counter, customMetadata);
+
+ LedgerHandle lh = SynchCallbackUtils.waitForResult(counter);
+ if (lh == null) {
+ LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
+ throw BKException.create(BKException.Code.UnexpectedConditionException);
+ } else if (ledgerId != lh.getId()) {
+ LOG.error("Unexpected condition : Expected ledgerId: {} but got: {}", ledgerId, lh.getId());
+ throw BKException.create(BKException.Code.UnexpectedConditionException);
+ }
+
+ LOG.info("Ensemble: {} for ledger: {}", lh.getLedgerMetadata().getEnsemble(0L),
+ lh.getId());
+
+ return lh;
+ }
+
+ /**
+ * Asynchronously creates a new ledger using the interface which accepts a ledgerId as input.
+ * This method returns {@link LedgerHandleAdv} which can accept entryId.
+ * Ledgers created with this call have ability to accept
+ * a separate write quorum and ack quorum size. The write quorum must be larger than
+ * the ack quorum.
+ *
+ * Separating the write and the ack quorum allows the BookKeeper client to continue
+ * writing when a bookie has failed but the failure has not yet been detected. Detecting
+ * a bookie has failed can take a number of seconds, as configured by the read timeout
+ * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
+ * that bookie will be removed from the ensemble.
+ *
+ * The other parameters match those of {@link #asyncCreateLedger(long, int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ *
+ * @param ledgerId
+ * ledger Id to use for the newly created ledger
+ * @param ensSize
+ * number of bookies over which to stripe entries
+ * @param writeQuorumSize
+ * number of bookies each entry will be written to
+ * @param ackQuorumSize
+ * number of bookies which must acknowledge an entry before the call is completed
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param cb
+ * createCallback implementation
+ * @param ctx
+ * optional control object
+ * @param customMetadata
+ * optional customMetadata that holds user specified metadata
+ */
+ public void asyncCreateLedgerAdv(final long ledgerId,
+ final int ensSize,
+ final int writeQuorumSize,
+ final int ackQuorumSize,
+ final DigestType digestType,
+ final byte[] passwd,
+ final CreateCallback cb,
+ final Object ctx,
+ final Map<String, byte[]> customMetadata) {
+ if (writeQuorumSize < ackQuorumSize) {
+ throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
+ }
+ closeLock.readLock().lock();
+ try {
+ if (closed) {
+ cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
+ return;
+ }
+ new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+ ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv(ledgerId);
} finally {
closeLock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 52a5cb6..376d716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -48,7 +48,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
CreateCallback cb;
LedgerMetadata metadata;
LedgerHandle lh;
- Long ledgerId;
+ Long ledgerId = -1L;
Object ctx;
byte[] passwd;
BookKeeper bk;
@@ -56,6 +56,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
long startTime;
OpStatsLogger createOpLogger;
boolean adv = false;
+ boolean generateLedgerId = true;
/**
* Constructor
@@ -119,12 +120,16 @@ class LedgerCreateOp implements GenericCallback<Void> {
* Add ensemble to the configuration
*/
metadata.addEnsemble(0L, ensemble);
-
- createLedger();
+ if (this.generateLedgerId) {
+ generateLedgerIdAndCreateLedger();
+ } else {
+ // Create ledger with supplied ledgerId
+ bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
+ }
}
- void createLedger() {
- // generate a ledger id and then create the ledger with metadata
+ void generateLedgerIdAndCreateLedger() {
+ // generate a ledgerId
final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
@Override
@@ -133,7 +138,6 @@ class LedgerCreateOp implements GenericCallback<Void> {
createComplete(rc, null);
return;
}
-
LedgerCreateOp.this.ledgerId = ledgerId;
// create a ledger with metadata
bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
@@ -144,8 +148,12 @@ class LedgerCreateOp implements GenericCallback<Void> {
/**
* Initiates the operation to return LedgerHandleAdv.
*/
- public void initiateAdv() {
+ public void initiateAdv(final long ledgerId) {
this.adv = true;
+ this.ledgerId = ledgerId;
+ if (this.ledgerId != -1L) {
+ this.generateLedgerId = false;
+ }
initiate();
}
@@ -154,9 +162,9 @@ class LedgerCreateOp implements GenericCallback<Void> {
*/
@Override
public void operationComplete(int rc, Void result) {
- if (BKException.Code.LedgerExistException == rc) {
+ if (this.generateLedgerId && (BKException.Code.LedgerExistException == rc)) {
// retry to generate a new ledger id
- createLedger();
+ generateLedgerIdAndCreateLedger();
return;
} else if (BKException.Code.OK != rc) {
createComplete(rc, null);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 2804af1..bed1627 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -84,7 +84,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
}
// get ledger from all level nodes
- private long getLedgerId(String...levelNodes) throws IOException {
+ long getLedgerId(String...levelNodes) throws IOException {
return StringUtils.stringToHierarchicalLedgerId(levelNodes);
}
@@ -151,7 +151,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
/**
* Process hash nodes in a given path
*/
- private void asyncProcessLevelNodes(
+ void asyncProcessLevelNodes(
final String path, final Processor<String> processor,
final AsyncCallback.VoidCallback finalCb, final Object context,
final int successRc, final int failureRc) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
new file mode 100644
index 0000000..990297f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes.
+ *
+ * <p>
+ * LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
+ *
+ * <pre>
+ * <level1 (3 digits)><level2 (4 digits)><level3 (4 digits)><level4 (4 digits)>
+ * <level5 (4 digits)>
+ * </pre>
+ *
+ * These 5 parts are used to form the actual ledger node path used to store ledger metadata:
+ *
+ * <pre>
+ * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * </pre>
+ *
+ * E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>,
+ * which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers,
+ * which avoids errors during garbage collection due to lists of children that are too long.
+ */
+class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+
+ static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
+
+ private static final String MAX_ID_SUFFIX = "9999";
+ private static final String MIN_ID_SUFFIX = "0000";
+
+ /**
+ * Constructor
+ *
+ * @param conf
+ * Configuration object
+ * @param zk
+ * ZooKeeper Client Handle
+ */
+ public LongHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
+ }
+
+ @Override
+ public String getLedgerPath(long ledgerId) {
+ return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+ }
+
+ @Override
+ public long getLedgerId(String pathName) throws IOException {
+ if (!pathName.startsWith(ledgerRootPath)) {
+ throw new IOException("it is not a valid hashed path name : " + pathName);
+ }
+ String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+ return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
+ }
+
+ //
+ // Active Ledger Manager
+ //
+
+ /**
+ * Get the smallest cache id in a specified node /level1/level2/level3/level4
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @param level3
+ * 3rd level node name
+ * @param level4
+ * 4th level node name
+ * @return the smallest ledger id
+ */
+ private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4)
+ throws IOException {
+ return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+ }
+
+ /**
+ * Get the largest cache id in a specified node /level1/level2/level3/level4
+ *
+ * @param level1
+ * 1st level node name
+ * @param level2
+ * 2nd level node name
+ * @param level3
+ * 3rd level node name
+ * @param level4
+ * 4th level node name
+ * @return the largest ledger id
+ */
+ private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException {
+ return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+ }
+
+ @Override
+ public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
+ final Object context, final int successRc, final int failureRc) {
+ asyncProcessLevelNodes(ledgerRootPath,
+ new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context,
+ successRc, failureRc);
+ }
+
+ private class RecursiveProcessor implements Processor<String> {
+ private final int level;
+ private final String path;
+ private final Processor<Long> processor;
+ private final Object context;
+ private final int successRc;
+ private final int failureRc;
+
+ private RecursiveProcessor(int level, String path, Processor<Long> processor, Object context, int successRc,
+ int failureRc) {
+ this.level = level;
+ this.path = path;
+ this.processor = processor;
+ this.context = context;
+ this.successRc = successRc;
+ this.failureRc = failureRc;
+ }
+
+ @Override
+ public void process(String lNode, VoidCallback cb) {
+ String nodePath = path + "/" + lNode;
+ if ((level == 0) && isSpecialZnode(lNode)) {
+ cb.processResult(successRc, null, context);
+ return;
+ } else if (level < 3) {
+ asyncProcessLevelNodes(nodePath,
+ new RecursiveProcessor(level + 1, nodePath, processor, context, successRc, failureRc), cb,
+ context, successRc, failureRc);
+ } else {
+ // process each ledger after all ledger are processed, cb will be call to continue processing next
+ // level5 node
+ asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc);
+ }
+ }
+ }
+
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return new LongHierarchicalLedgerRangeIterator();
+ }
+
+ /**
+ * Iterator through each metadata bucket with hierarchical mode
+ */
+ private class LongHierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+ private List<Iterator<String>> levelNodesIter;
+ private List<String> curLevelNodes;
+
+ private boolean initialized = false;
+ private boolean iteratorDone = false;
+ private LedgerRange nextRange = null;
+
+ private LongHierarchicalLedgerRangeIterator() {
+ levelNodesIter = new ArrayList<Iterator<String>>(Collections.nCopies(4, (Iterator<String>) null));
+ curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
+ }
+
+ private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
+ List<String> levelNodes = zk.getChildren(path, null);
+ Collections.sort(levelNodes);
+ if (level == 0) {
+ Iterator<String> l0NodesIter = levelNodes.iterator();
+ levelNodesIter.set(0, l0NodesIter);
+ while (l0NodesIter.hasNext()) {
+ String curL0Node = l0NodesIter.next();
+ if (!isSpecialZnode(curL0Node)) {
+ curLevelNodes.set(0, curL0Node);
+ break;
+ }
+ }
+ } else {
+ Iterator<String> lNodesIter = levelNodes.iterator();
+ levelNodesIter.set(level, lNodesIter);
+ if (lNodesIter.hasNext()) {
+ String curLNode = lNodesIter.next();
+ curLevelNodes.set(level, curLNode);
+ }
+ }
+ String curLNode = curLevelNodes.get(level);
+ if (curLNode != null) {
+ if (level != 3) {
+ String nextLevelPath = path + "/" + curLNode;
+ initialize(nextLevelPath, level + 1);
+ } else {
+ nextRange = getLedgerRangeByLevel(curLevelNodes);
+ initialized = true;
+ }
+ } else {
+ iteratorDone = true;
+ }
+ }
+
+ private boolean moveToNext(int level) throws KeeperException, InterruptedException {
+ Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
+ boolean movedToNextNode = false;
+ if (level == 0) {
+ while (curLevelNodesIter.hasNext()) {
+ String nextNode = curLevelNodesIter.next();
+ if (isSpecialZnode(nextNode)) {
+ continue;
+ } else {
+ curLevelNodes.set(level, nextNode);
+ movedToNextNode = true;
+ break;
+ }
+ }
+ } else {
+ if (curLevelNodesIter.hasNext()) {
+ String nextNode = curLevelNodesIter.next();
+ curLevelNodes.set(level, nextNode);
+ movedToNextNode = true;
+ } else {
+ movedToNextNode = moveToNext(level - 1);
+ if (movedToNextNode) {
+ StringBuilder path = new StringBuilder(ledgerRootPath);
+ for (int i = 0; i < level; i++) {
+ path = path.append("/").append(curLevelNodes.get(i));
+ }
+ List<String> newCurLevelNodesList = zk.getChildren(path.toString(), null);
+ Collections.sort(newCurLevelNodesList);
+ Iterator<String> newCurLevelNodesIter = newCurLevelNodesList.iterator();
+ levelNodesIter.set(level, newCurLevelNodesIter);
+ if (newCurLevelNodesIter.hasNext()) {
+ curLevelNodes.set(level, newCurLevelNodesIter.next());
+ movedToNextNode = true;
+ }
+ }
+ }
+ }
+ return movedToNextNode;
+ }
+
+ synchronized private void preload() throws IOException, KeeperException, InterruptedException {
+ if (!iteratorDone && !initialized) {
+ initialize(ledgerRootPath, 0);
+ }
+ while (((nextRange == null) || (nextRange.size() == 0)) && !iteratorDone) {
+ boolean movedToNextNode = moveToNext(3);
+ if (movedToNextNode) {
+ nextRange = getLedgerRangeByLevel(curLevelNodes);
+ } else {
+ iteratorDone = true;
+ }
+ }
+ }
+
+ @Override
+ synchronized public boolean hasNext() throws IOException {
+ try {
+ preload();
+ } catch (KeeperException ke) {
+ throw new IOException("Error preloading next range", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while preloading", ie);
+ }
+ return nextRange != null && !iteratorDone;
+ }
+
+ @Override
+ synchronized public LedgerRange next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ LedgerRange r = nextRange;
+ nextRange = null;
+ return r;
+ }
+
+ LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
+ String level1 = curLevelNodes.get(0);
+ String level2 = curLevelNodes.get(1);
+ String level3 = curLevelNodes.get(2);
+ String level4 = curLevelNodes.get(3);
+
+ StringBuilder nodeBuilder = new StringBuilder();
+ nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
+ .append(level3).append("/").append(level4);
+ String nodePath = nodeBuilder.toString();
+ List<String> ledgerNodes = null;
+ try {
+ ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+ } catch (InterruptedException e) {
+ throw new IOException("Error when get child nodes from zk", e);
+ }
+ NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/"
+ + level4 + " : " + zkActiveLedgers);
+ }
+ return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true,
+ getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..020bde8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,12 @@
+package org.apache.bookkeeper.meta;
+
+public class LongHierarchicalLedgerManagerFactory extends HierarchicalLedgerManagerFactory {
+
+ public static final String NAME = "longhierarchical";
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new LongHierarchicalLedgerManager(conf, zk);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index bea0372..c2f658b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -41,6 +41,16 @@ public class StringUtils {
}
/**
+ * Formats ledger ID according to ZooKeeper rules
+ *
+ * @param id
+ * znode id
+ */
+ public static String getZKStringIdForLongHierarchical(long id) {
+ return String.format("%019d", id);
+ }
+
+ /**
* Get the hierarchical ledger path according to the ledger id
*
* @param ledgerId
@@ -60,6 +70,27 @@ public class StringUtils {
}
/**
+ * Get the long hierarchical ledger path according to the ledger id
+ *
+ * @param ledgerId
+ * ledger id
+ * @return the long hierarchical path
+ */
+ public static String getLongHierarchicalLedgerPath(long ledgerId) {
+ String ledgerIdStr = getZKStringIdForLongHierarchical(ledgerId);
+ // do 3-4-4-4-4 split
+ StringBuilder sb = new StringBuilder();
+ sb.append("/")
+ .append(ledgerIdStr.substring(0, 3)).append("/")
+ .append(ledgerIdStr.substring(3, 7)).append("/")
+ .append(ledgerIdStr.substring(7, 11)).append("/")
+ .append(ledgerIdStr.substring(11, 15)).append("/")
+ .append(LEDGER_NODE_PREFIX)
+ .append(ledgerIdStr.substring(15, 19));
+ return sb.toString();
+ }
+
+ /**
* Parse the hierarchical ledger path to its ledger id
*
* @param hierarchicalLedgerPath
@@ -78,6 +109,24 @@ public class StringUtils {
}
/**
+ * Parse the long hierarchical ledger path to its ledger id
+ *
+ * @param longHierarchicalLedgerPaths
+ * @return the ledger id
+ * @throws IOException
+ */
+ public static long stringToLongHierarchicalLedgerId(String longHierarchicalLedgerPath)
+ throws IOException {
+ String[] longHierarchicalParts = longHierarchicalLedgerPath.split("/");
+ if (longHierarchicalParts.length != 5) {
+ throw new IOException("it is not a valid hierarchical path name : " + longHierarchicalLedgerPath);
+ }
+ longHierarchicalParts[4] =
+ longHierarchicalParts[4].substring(LEDGER_NODE_PREFIX.length());
+ return stringToHierarchicalLedgerId(longHierarchicalParts);
+ }
+
+ /**
* Get ledger id
*
* @param levelNodes
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index a5fbe24..69ac921 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -20,9 +20,13 @@
*/
package org.apache.bookkeeper.client;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.List;
import java.util.Random;
import java.util.Map;
import java.util.UUID;
@@ -30,6 +34,7 @@ import java.util.HashMap;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
import org.junit.Before;
@@ -37,9 +42,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
-
/**
* Testing ledger write entry cases
*/
@@ -177,6 +179,50 @@ public class BookieWriteLedgerTest extends
}
/**
+ * Verify the functionality of Advanced Ledger which accepts ledgerId as input and returns
+ * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
+ * user manage entryId allocation.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvWithLedgerId() throws Exception {
+ // Create a ledger
+ long ledgerId = 0xABCDEF;
+ lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries1.add(entry.array());
+ lh.addEntry(i, entry.array());
+ }
+ // Start one more bookies
+ startNewBookie();
+
+ // Shutdown one bookie in the last ensemble and continue writing
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+ .getValue();
+ killBookie(ensemble.get(0));
+
+ int i = numEntriesToWrite;
+ numEntriesToWrite = numEntriesToWrite + 50;
+ for (; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries1.add(entry.array());
+ lh.addEntry(i, entry.array());
+ }
+
+ readEntries(lh, entries1);
+ lh.close();
+ bkc.deleteLedger(ledgerId);
+ }
+
+ /**
* Verify the functionality of Ledger create which accepts customMetadata as input.
* Also verifies that the data written is read back properly.
*
@@ -222,6 +268,55 @@ public class BookieWriteLedgerTest extends
}
}
+ /*
+ * In a loop create/write/delete the ledger with same ledgerId through
+ * the functionality of Advanced Ledger which accepts ledgerId as input.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 180000)
+ public void testLedgerCreateAdvWithLedgerIdInLoop() throws Exception {
+ long ledgerId;
+ int ledgerCount = 40;
+
+ List<List<byte[]>> entryList = new ArrayList<List<byte[]>>();
+ LedgerHandle[] lhArray = new LedgerHandle[ledgerCount];
+
+ List<byte[]> tmpEntry;
+ for (int lc = 0; lc < ledgerCount; lc++) {
+ tmpEntry = new ArrayList<byte[]>();
+
+ ledgerId = rng.nextLong();
+ ledgerId &= Long.MAX_VALUE;
+ if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+ // since LongHierarchicalLedgerManager supports ledgerIds of decimal length upto 19 digits but other
+ // LedgerManagers only upto 10 decimals
+ ledgerId %= 9999999999L;
+ }
+
+ LOG.info("Iteration: {} LedgerId: {}", lc, ledgerId);
+ lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+ lhArray[lc] = lh;
+
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ tmpEntry.add(entry.array());
+ lh.addEntry(i, entry.array());
+ }
+ entryList.add(tmpEntry);
+ }
+ for (int lc = 0; lc < ledgerCount; lc++) {
+ // Read and verify
+ long lid = lhArray[lc].getId();
+ LOG.info("readEntries for lc: {} ledgerId: {} ", lc, lhArray[lc].getId());
+ readEntries(lhArray[lc], entryList.get(lc));
+ lhArray[lc].close();
+ bkc.deleteLedger(lid);
+ }
+ }
+
/**
* Verify asynchronous writing when few bookie failures in last ensemble.
*/
@@ -615,7 +710,7 @@ public class BookieWriteLedgerTest extends
lh.close();
}
- private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws InterruptedException, BKException {
+ private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
while (ls.hasMoreElements()) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index df74339..97e3c9f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
@@ -72,7 +73,8 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
return Arrays.asList(new Object[][] {
{ FlatLedgerManagerFactory.class },
{ HierarchicalLedgerManagerFactory.class },
- { MSLedgerManagerFactory.class }
+ { LongHierarchicalLedgerManagerFactory.class },
+ { MSLedgerManagerFactory.class },
});
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index d5866a5..c1e8bde 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -263,7 +263,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.poll());
}
- @Test(timeout=60000)
+ @Test(timeout=120000)
public void testGcLedgersNotLast() throws Exception {
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final List<Long> cleaned = new ArrayList<Long>();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cf7fdcc..5387424 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -89,7 +89,8 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
return Arrays.asList(new Object[][] {
{ FlatLedgerManagerFactory.class },
{ HierarchicalLedgerManagerFactory.class },
- { MSLedgerManagerFactory.class }
+ { LongHierarchicalLedgerManagerFactory.class },
+ { MSLedgerManagerFactory.class },
});
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
index b4026b4..2c9a1f4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
@@ -49,6 +49,7 @@ public abstract class MultiLedgerManagerMultiDigestTestCase extends BookKeeperCl
String[] ledgerManagers = {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.MSLedgerManagerFactory",
};
ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/42e8f129/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
index cba8be4..34a22af 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
@@ -28,8 +28,6 @@ import org.junit.runners.Parameterized.Parameters;
*
*/
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
-
/**
* Test Case run over different ledger manager.
*/
@@ -45,6 +43,7 @@ public abstract class MultiLedgerManagerTestCase extends BookKeeperClusterTestCa
String[] ledgerManagers = new String[] {
"org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+ "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
"org.apache.bookkeeper.meta.MSLedgerManagerFactory",
};
ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);