You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2015/11/13 08:12:27 UTC
bookkeeper git commit: BOOKKEEPER-438: Move ledger id generation out
of LedgerManager (Sijie via mmerli)
Repository: bookkeeper
Updated Branches:
refs/heads/branch-4.3 c985aa0b9 -> 698283126
BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Sijie via mmerli)
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/69828312
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/69828312
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/69828312
Branch: refs/heads/branch-4.3
Commit: 698283126ca1e9854be28b063e91208fdf4e710b
Parents: c985aa0
Author: Matteo Merli <mm...@apache.org>
Authored: Thu Nov 12 22:53:19 2015 -0800
Committer: Matteo Merli <mm...@apache.org>
Committed: Thu Nov 12 22:53:19 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/bookkeeper/client/BKException.java | 9 ++
.../apache/bookkeeper/client/BookKeeper.java | 9 ++
.../bookkeeper/client/LedgerCreateOp.java | 41 ++++--
.../bookkeeper/conf/AbstractConfiguration.java | 2 +-
.../meta/AbstractZkLedgerManager.java | 28 ++++
.../bookkeeper/meta/CleanupLedgerManager.java | 6 +-
.../bookkeeper/meta/FlatLedgerManager.java | 37 ------
.../meta/FlatLedgerManagerFactory.java | 5 +
.../meta/HierarchicalLedgerManager.java | 95 +-------------
.../meta/HierarchicalLedgerManagerFactory.java | 9 +-
.../apache/bookkeeper/meta/LedgerManager.java | 43 +++---
.../bookkeeper/meta/LedgerManagerFactory.java | 8 ++
.../bookkeeper/meta/MSLedgerManagerFactory.java | 130 +++++++------------
.../bookkeeper/bookie/CompactionTest.java | 2 +-
.../client/TestWatchEnsembleChange.java | 91 +++++++------
.../apache/bookkeeper/meta/GcLedgersTest.java | 40 ++++--
.../bookkeeper/meta/LedgerManagerTestCase.java | 15 ++-
18 files changed, 274 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 378ae55..8a5d2d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,6 +61,8 @@ Release 4.3.1 - 2015-05-19
BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)
+ BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)
+
bookkeeper-server:
BOOKKEEPER-833: EntryLogId and EntryLogLimit should not be larger than Integer.MAX_VALUE (sijie)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index c5be32d..b43506d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -124,6 +124,7 @@ public abstract class BKException extends Exception {
int MetadataVersionException = -17;
int MetaStoreException = -18;
int ClientClosedException = -19;
+ int LedgerExistException = -20;
int IllegalOpException = -100;
int LedgerFencedException = -101;
@@ -170,6 +171,8 @@ public abstract class BKException extends Exception {
return "Error while using ZooKeeper";
case Code.MetaStoreException:
return "Error while using MetaStore";
+ case Code.LedgerExistException:
+ return "Ledger existed";
case Code.LedgerRecoveryException:
return "Error while recovering ledger";
case Code.LedgerClosedException:
@@ -301,6 +304,12 @@ public abstract class BKException extends Exception {
}
}
+ public static class BKLedgerExistException extends BKException {
+ public BKLedgerExistException() {
+ super(Code.LedgerExistException);
+ }
+ }
+
public static class BKLedgerRecoveryException extends BKException {
public BKLedgerRecoveryException() {
super(Code.LedgerRecoveryException);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 49d8e59..9e2cd00 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.CleanupLedgerManager;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.proto.BookieClient;
@@ -98,6 +99,7 @@ public class BookKeeper {
// Ledger manager responsible for how to store ledger meta data
final LedgerManagerFactory ledgerManagerFactory;
final LedgerManager ledgerManager;
+ final LedgerIdGenerator ledgerIdGenerator;
// Ensemble Placement Policy
final EnsemblePlacementPolicy placementPolicy;
@@ -221,6 +223,7 @@ public class BookKeeper {
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+ ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
ownChannelFactory = true;
ownZKHandle = true;
@@ -307,6 +310,7 @@ public class BookKeeper {
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+ ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
}
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
@@ -335,6 +339,10 @@ public class BookKeeper {
return ledgerManager;
}
+ LedgerIdGenerator getLedgerIdGenerator() {
+ return ledgerIdGenerator;
+ }
+
/**
* There are 2 digest types that can be used for verification. The CRC32 is
* cheap to compute but does not protect against byzantine bookies (i.e., a
@@ -811,6 +819,7 @@ public class BookKeeper {
// Close ledger manage so all pending metadata requests would be failed
// which will reject any incoming metadata requests.
ledgerManager.close();
+ ledgerIdGenerator.close();
ledgerManagerFactory.uninitialize();
} catch (IOException ie) {
LOG.error("Failed to close ledger manager : ", ie);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index fe223af..5ed0901 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -38,13 +39,14 @@ import org.slf4j.LoggerFactory;
* Encapsulates asynchronous ledger create operation
*
*/
-class LedgerCreateOp implements GenericCallback<Long> {
+class LedgerCreateOp implements GenericCallback<Void> {
static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
CreateCallback cb;
LedgerMetadata metadata;
LedgerHandle lh;
+ Long ledgerId;
Object ctx;
byte[] passwd;
BookKeeper bk;
@@ -59,12 +61,14 @@ class LedgerCreateOp implements GenericCallback<Long> {
* BookKeeper object
* @param ensembleSize
* ensemble size
- * @param quorumSize
- * quorum size
+ * @param writeQuorumSize
+ * write quorum size
+ * @param ackQuorumSize
+ * ack quorum size
* @param digestType
* digest type, either MAC or CRC32
* @param passwd
- * passowrd
+ * password
* @param cb
* callback implementation
* @param ctx
@@ -110,16 +114,37 @@ class LedgerCreateOp implements GenericCallback<Long> {
*/
metadata.addEnsemble(0L, ensemble);
- // create a ledger with metadata
- bk.getLedgerManager().createLedger(metadata, this);
+ createLedger();
+ }
+
+ void createLedger() {
+ // generate a ledger id and then create the ledger with metadata
+ final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
+ ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
+ @Override
+ public void operationComplete(int rc, Long ledgerId) {
+ if (BKException.Code.OK != rc) {
+ createComplete(rc, null);
+ return;
+ }
+
+ LedgerCreateOp.this.ledgerId = ledgerId;
+ // create a ledger with metadata
+ bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
+ }
+ });
}
/**
* Callback when created ledger.
*/
@Override
- public void operationComplete(int rc, Long ledgerId) {
- if (BKException.Code.OK != rc) {
+ public void operationComplete(int rc, Void result) {
+ if (BKException.Code.LedgerExistException == rc) {
+ // retry to generate a new ledger id
+ createLedger();
+ return;
+ } else if (BKException.Code.OK != rc) {
createComplete(rc, null);
return;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 3ec2b5a..2e66806 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -178,7 +178,7 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
public String getZkAvailableBookiesPath() {
return getZkLedgersRootPath() + "/" + AVAILABLE_NODE;
}
-
+
/**
* Set the max entries to keep in fragment for re-replication. If fragment
* has more entries than this count, then the original fragment will be
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index f3f680d..6636506 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -42,11 +42,14 @@ import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -213,6 +216,31 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
}
}
+ @Override
+ public void createLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+ final GenericCallback<Void> ledgerCb) {
+ String ledgerPath = getLedgerPath(ledgerId);
+ StringCallback scb = new StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ if (rc == Code.OK.intValue()) {
+ // update version
+ metadata.setVersion(new ZkVersion(0));
+ ledgerCb.operationComplete(BKException.Code.OK, null);
+ } else if (rc == Code.NODEEXISTS.intValue()) {
+ LOG.warn("Failed to create ledger metadata for {} which already exist", ledgerId);
+ ledgerCb.operationComplete(BKException.Code.LedgerExistException, null);
+ } else {
+ LOG.error("Could not create node for ledger {}", ledgerId,
+ KeeperException.create(Code.get(rc), path));
+ ledgerCb.operationComplete(BKException.Code.ZKException, null);
+ }
+ }
+ };
+ ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, scb, null);
+ }
+
/**
* Removes ledger metadata from ZooKeeper if version matches.
*
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
index a7fbcf5..961e0d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
@@ -98,15 +98,15 @@ public class CleanupLedgerManager implements LedgerManager {
}
@Override
- public void createLedger(LedgerMetadata metadata,
- GenericCallback<Long> cb) {
+ public void createLedgerMetadata(long lid, LedgerMetadata metadata,
+ GenericCallback<Void> cb) {
closeLock.readLock().lock();
try {
if (closed) {
cb.operationComplete(BKException.Code.ClientClosedException, null);
return;
}
- underlying.createLedger(metadata, new CleanupGenericCallback<Long>(cb));
+ underlying.createLedgerMetadata(lid, metadata, new CleanupGenericCallback<Void>(cb));
} finally {
closeLock.readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 2bc4258..6bd3216 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -22,19 +22,11 @@ import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +52,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
* Configuration object
* @param zk
* ZooKeeper Client Handle
- * @param ledgerRootPath
- * ZooKeeper Path to store ledger metadata
* @throws IOException when version is not compatible
*/
public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
@@ -71,33 +61,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
}
@Override
- public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) {
- StringCallback scb = new StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx,
- String name) {
- if (Code.OK.intValue() != rc) {
- LOG.error("Could not create node for ledger",
- KeeperException.create(KeeperException.Code.get(rc), path));
- cb.operationComplete(BKException.Code.ZKException, null);
- } else {
- // update znode status
- metadata.setVersion(new ZkVersion(0));
- try {
- long ledgerId = getLedgerId(name);
- cb.operationComplete(BKException.Code.OK, ledgerId);
- } catch (IOException ie) {
- LOG.error("Could not extract ledger-id from path:" + name, ie);
- cb.operationComplete(BKException.Code.ZKException, null);
- }
- }
- }
- };
- ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, metadata.serialize(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null);
- }
-
- @Override
public String getLedgerPath(long ledgerId) {
StringBuilder sb = new StringBuilder();
sb.append(ledgerPrefix)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index db16d26..46f8b9b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class FlatLedgerManagerFactory extends LedgerManagerFactory {
}
@Override
+ public LedgerIdGenerator newLedgerIdGenerator() {
+ return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), null);
+ }
+
+ @Override
public LedgerManager newLedgerManager() {
return new FlatLedgerManager(conf, zk);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 7f2df73..bc62af4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -23,23 +23,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,10 +41,7 @@ import org.slf4j.LoggerFactory;
* Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
*
* <p>
- * Hierarchical Ledger Manager first obtain a global unique id from zookeeper using a EPHEMERAL_SEQUENTIAL
- * znode <i>(ledgersRootPath)/ledgers/idgen/ID-</i>.
- * Since zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 (zero) padding, i.e.
- * "<path>0000000001", HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
* <pre><level1 (2 digits)><level2 (4 digits)><level3 (4 digits)></pre>
* These 3 parts are used to form the actual ledger node path used to store ledger metadata:
* <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
@@ -64,13 +54,9 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
static final String IDGEN_ZNODE = "idgen";
- static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
private static final String MAX_ID_SUFFIX = "9999";
private static final String MIN_ID_SUFFIX = "0000";
- // Path to generate global id
- private final String idGenPath;
-
/**
* Constructor
*
@@ -81,83 +67,6 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
*/
public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
super(conf, zk);
-
- this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
- }
-
- @Override
- public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
- ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, final String idPathName) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not generate new ledger id",
- KeeperException.create(KeeperException.Code.get(rc), path));
- ledgerCb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
- /*
- * Extract ledger id from gen path
- */
- long ledgerId;
- try {
- ledgerId = getLedgerIdFromGenPath(idPathName);
- } catch (IOException e) {
- LOG.error("Could not extract ledger-id from id gen path:" + path, e);
- ledgerCb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
- String ledgerPath = getLedgerPath(ledgerId);
- final long lid = ledgerId;
- StringCallback scb = new StringCallback() {
- @Override
- public void processResult(int rc, String path,
- Object ctx, String name) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not create node for ledger",
- KeeperException.create(KeeperException.Code.get(rc), path));
- ledgerCb.operationComplete(BKException.Code.ZKException, null);
- } else {
- // update version
- metadata.setVersion(new ZkVersion(0));
- ledgerCb.operationComplete(BKException.Code.OK, lid);
- }
- }
- };
- ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null);
- // delete the znode for id generation
- scheduler.submit(new Runnable() {
- @Override
- public void run() {
- zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.warn("Exception during deleting znode for id generation : ",
- KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- LOG.debug("Deleting znode for id generation : {}", idPathName);
- }
- }
- }, null);
- }
- });
- }
- }, null);
- }
-
- // get ledger id from generation path
- private long getLedgerIdFromGenPath(String nodeName) throws IOException {
- long ledgerId;
- try {
- String parts[] = nodeName.split(IDGENERATION_PREFIX);
- ledgerId = Long.parseLong(parts[parts.length - 1]);
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
- return ledgerId;
}
@Override
@@ -304,7 +213,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
* Callback to process element of list when success
* @param finalCb
* Final callback to be called after all elements in the list are processed
- * @param contxt
+ * @param context
* Context of final callback
* @param successRc
* RC passed to final callback on success
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index b843e99..a165b0d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
}
@Override
+ public LedgerIdGenerator newLedgerIdGenerator() {
+ return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE);
+ }
+
+ @Override
public LedgerManager newLedgerManager() {
return new HierarchicalLedgerManager(conf, zk);
}
@@ -81,8 +86,7 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
String ledgersRootPath = conf.getZkLedgersRootPath();
List<String> children = zk.getChildren(ledgersRootPath, false);
for (String child : children) {
- if (!HierarchicalLedgerManager.IDGEN_ZNODE.equals(child)
- && ledgerManager.isSpecialZnode(child)) {
+ if (ledgerManager.isSpecialZnode(child)) {
continue;
}
ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
@@ -90,4 +94,5 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
// Delete and recreate the LAYOUT information.
super.format(conf, zk);
}
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 7229028..fe3c2cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -42,16 +42,21 @@ import org.apache.bookkeeper.versioning.Version;
public interface LedgerManager extends Closeable {
/**
- * Create a new ledger with provided metadata
+ * Create a new ledger with provided ledger id and metadata
*
+ * @param ledgerId
+ * Ledger id provided to be created
* @param metadata
- * Metadata provided when creating a new ledger
+ * Metadata provided when creating the new ledger
* @param cb
- * Callback when creating a new ledger.
- * {@link BKException.Code.ZKException} return code when can't generate
- * or extract new ledger id
+ * Callback when creating a new ledger. Return code:<ul>
+ * <li>{@link BKException.Code.OK} if success</li>
+ * <li>{@link BKException.Code.LedgerExistException} if given ledger id exist</li>
+ * <li>{@link BKException.Code.ZKException}/{@link BKException.Code.MetaStoreException}
+ * for other issue</li>
+ * </ul>
*/
- public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+ public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
/**
* Remove a specified ledger metadata by ledgerId and version.
@@ -61,10 +66,12 @@ public interface LedgerManager extends Closeable {
* @param version
* Ledger metadata version
* @param cb
- * Callback when removed ledger metadata.
- * {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
- * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
- * {@link BKException.Code.ZKException} return code when other issues happen.
+ * Callback when remove ledger metadata. Return code:<ul>
+ * <li>{@link BKException.Code.OK} if success</li>
+ * <li>{@link BKException.Code.MetadataVersionException} if version doesn't match</li>
+ * <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+ * <li>{@link BKException.Code.ZKException} for other issue</li>
+ * </ul>
*/
public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb);
@@ -74,9 +81,11 @@ public interface LedgerManager extends Closeable {
* @param ledgerId
* Ledger Id
* @param readCb
- * Callback when read ledger metadata.
- * {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
- * {@link BKException.Code.ZKException} return code when other issues happen.
+ * Callback when read ledger metadata. Return code:<ul>
+ * <li>{@link BKException.Code.OK} if success</li>
+ * <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+ * <li>{@link BKException.Code.ZKException} for other issue</li>
+ * </ul>
*/
public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
@@ -88,9 +97,11 @@ public interface LedgerManager extends Closeable {
* @param metadata
* Ledger Metadata to write
* @param cb
- * Callback when finished writing ledger metadata.
- * {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
- * {@link BKException.Code.ZKException} return code when other issues happen.
+ * Callback when finished writing ledger metadata. Return code:<ul>
+ * <li>{@link BKException.Code.OK} if success</li>
+ * <li>{@link BKException.Code.MetadataVersionException} if version in metadata doesn't match</li>
+ * <li>{@link BKException.Code.ZKException} for other issue</li>
+ * </ul>
*/
public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 7c3cf5c..3a53623 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -68,6 +68,14 @@ public abstract class LedgerManagerFactory {
public abstract void uninitialize() throws IOException;
/**
+ * Return the ledger id generator, which is used for global unique ledger id
+ * generation.
+ *
+ * @return ledger id generator.
+ */
+ public abstract LedgerIdGenerator newLedgerIdGenerator();
+
+ /**
* return ledger manager for client-side to manage ledger metadata.
*
* @return ledger manager
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 2510b89..9f7ef38 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManager.ReadLedgerMetadataTask;
import org.apache.bookkeeper.metastore.MSException;
import org.apache.bookkeeper.metastore.MSWatchedEvent;
import org.apache.bookkeeper.metastore.MetaStore;
@@ -46,23 +45,20 @@ import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
import org.apache.bookkeeper.metastore.MetastoreException;
import org.apache.bookkeeper.metastore.MetastoreFactory;
import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreTable;
import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.MetastoreUtils;
import org.apache.bookkeeper.metastore.MetastoreWatcher;
-import org.apache.bookkeeper.metastore.MSWatchedEvent.EventType;
import org.apache.bookkeeper.metastore.Value;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,6 +176,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
}
}
+ @Override
+ public LedgerIdGenerator newLedgerIdGenerator() {
+ return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), MsLedgerManager.IDGEN_ZNODE);
+ }
+
static class MsLedgerManager implements LedgerManager, MetastoreWatcher {
final ZooKeeper zk;
final AbstractConfiguration conf;
@@ -189,15 +190,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
final int maxEntriesPerScan;
static final String IDGEN_ZNODE = "ms-idgen";
- static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
// ledger metadata listeners
protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners =
new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
- // Path to generate global id
- private final String idGenPath;
-
// we use this to prevent long stack chains from building up in
// callbacks
ScheduledExecutorService scheduler;
@@ -266,7 +263,6 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
// configuration settings
maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
- this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
.setNameFormat("MSLedgerManagerScheduler-%d");
this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb
@@ -346,76 +342,28 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
}
@Override
- public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
- ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, final String idPathName) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not generate new ledger id",
- KeeperException.create(KeeperException.Code.get(rc), path));
- ledgerCb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
- /*
- * Extract ledger id from gen path
- */
- long ledgerId;
- try {
- ledgerId = getLedgerIdFromGenPath(idPathName);
- } catch (IOException e) {
- LOG.error("Could not extract ledger-id from id gen path:" + path, e);
- ledgerCb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
-
- final long lid = ledgerId;
- MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
- @Override
- public void complete(int rc, Version version, Object ctx) {
- if (MSException.Code.BadVersion.getCode() == rc) {
- ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
- return;
- }
- if (MSException.Code.OK.getCode() != rc) {
- ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
- return;
- }
- LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid,
- version });
- // update version
- metadata.setVersion(version);
- ledgerCb.operationComplete(BKException.Code.OK, lid);
- }
- };
-
- ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
- Version.NEW, msCallback, null);
- zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.warn("Exception during deleting znode for id generation : ",
- KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- LOG.debug("Deleting znode for id generation : {}", idPathName);
- }
- }
- }, null);
- }
- }, null);
- }
+ public void createLedgerMetadata(final long lid, final LedgerMetadata metadata,
+ final GenericCallback<Void> ledgerCb) {
+ MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+ @Override
+ public void complete(int rc, Version version, Object ctx) {
+ if (MSException.Code.BadVersion.getCode() == rc) {
+ ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
+ return;
+ }
+ if (MSException.Code.OK.getCode() != rc) {
+ ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
+ return;
+ }
+ LOG.debug("Create ledger {} with version {} successfully.", lid, version);
+ // update version
+ metadata.setVersion(version);
+ ledgerCb.operationComplete(BKException.Code.OK, null);
+ }
+ };
- // get ledger id from generation path
- private long getLedgerIdFromGenPath(String nodeName) throws IOException {
- long ledgerId;
- try {
- String parts[] = nodeName.split(IDGENERATION_PREFIX);
- ledgerId = Long.parseLong(parts[parts.length - 1]);
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
- return ledgerId;
+ ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+ Version.NEW, msCallback, null);
}
@Override
@@ -688,7 +636,7 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
* @param finalCb
* Final callback to be called after all elements in the list
* are processed
- * @param contxt
+ * @param context
* Context of final callback
* @param successRc
* RC passed to final callback on success
@@ -730,4 +678,24 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
}
}
+ @Override
+ public void format(AbstractConfiguration conf, ZooKeeper zk) throws InterruptedException,
+ KeeperException, IOException {
+ MetastoreTable ledgerTable;
+ try {
+ ledgerTable = metastore.createScannableTable(TABLE_NAME);
+ } catch (MetastoreException mse) {
+ throw new IOException("Failed to instantiate table " + TABLE_NAME + " in metastore "
+ + metastore.getName());
+ }
+ try {
+ MetastoreUtils.cleanTable(ledgerTable, conf.getMetastoreMaxEntriesPerScan());
+ } catch (MSException mse) {
+ throw new IOException("Exception when cleanning up table " + TABLE_NAME, mse);
+ }
+ LOG.info("Finished cleaning up table {}.", TABLE_NAME);
+ // Delete and recreate the LAYOUT information.
+ super.format(conf, zk);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 956595e..c37e084 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -451,7 +451,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
private LedgerManager getLedgerManager(final Set<Long> ledgers) {
LedgerManager manager = new LedgerManager() {
@Override
- public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) {
+ public void createLedgerMetadata(long lid, LedgerMetadata metadata, GenericCallback<Void> cb) {
unsupported();
}
@Override
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index eb833a3..df74339 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -23,11 +23,13 @@ package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.meta.FlatLedgerManagerFactory;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -104,45 +106,54 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
@Test(timeout = 60000)
public void testWatchMetadataRemoval() throws Exception {
- LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
- factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
- LedgerManager manager = factory.newLedgerManager();
- final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
- final CountDownLatch createLatch = new CountDownLatch(1);
- final CountDownLatch removeLatch = new CountDownLatch(1);
-
- manager.createLedger( new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
- new BookkeeperInternalCallbacks.GenericCallback<Long>(){
-
- @Override
- public void operationComplete(int rc, Long result) {
- bbLedgerId.putLong(result);
- bbLedgerId.flip();
- createLatch.countDown();
- }
- });
- assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
- final long createdLid = bbLedgerId.getLong();
-
- manager.registerLedgerMetadataListener( createdLid,
- new LedgerMetadataListener() {
-
- @Override
- public void onChanged( long ledgerId, LedgerMetadata metadata ) {
- assertEquals(ledgerId, createdLid);
- assertEquals(metadata, null);
- removeLatch.countDown();
- }
- });
-
- manager.removeLedgerMetadata( createdLid, Version.ANY,
- new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-
- @Override
- public void operationComplete(int rc, Void result) {
- assertEquals(rc, BKException.Code.OK);
- }
- });
- assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
+ LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
+ factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
+ final LedgerManager manager = factory.newLedgerManager();
+ LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator();
+
+ final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
+ final CountDownLatch createLatch = new CountDownLatch(1);
+ final CountDownLatch removeLatch = new CountDownLatch(1);
+
+ idGenerator.generateLedgerId(new GenericCallback<Long>() {
+ @Override
+ public void operationComplete(int rc, final Long lid) {
+ manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
+ new BookkeeperInternalCallbacks.GenericCallback<Void>(){
+
+ @Override
+ public void operationComplete(int rc, Void result) {
+ bbLedgerId.putLong(lid);
+ bbLedgerId.flip();
+ createLatch.countDown();
+ }
+ });
+
+ }
+ });
+
+ assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
+ final long createdLid = bbLedgerId.getLong();
+
+ manager.registerLedgerMetadataListener( createdLid,
+ new LedgerMetadataListener() {
+
+ @Override
+ public void onChanged( long ledgerId, LedgerMetadata metadata ) {
+ assertEquals(ledgerId, createdLid);
+ assertEquals(metadata, null);
+ removeLatch.countDown();
+ }
+ });
+
+ manager.removeLedgerMetadata( createdLid, Version.ANY,
+ new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+
+ @Override
+ public void operationComplete(int rc, Void result) {
+ assertEquals(rc, BKException.Code.OK);
+ }
+ });
+ assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 19aab44..de352b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.meta;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -63,23 +64,38 @@ public class GcLedgersTest extends LedgerManagerTestCase {
/**
* Create ledgers
*/
- private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
+ private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
final AtomicInteger expected = new AtomicInteger(numLedgers);
for (int i=0; i<numLedgers; i++) {
- getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()),
- new GenericCallback<Long>() {
+ getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
@Override
- public void operationComplete(int rc, Long ledgerId) {
- if (rc == BKException.Code.OK) {
- activeLedgers.put(ledgerId, true);
- createdLedgers.add(ledgerId);
- }
- synchronized (expected) {
- int num = expected.decrementAndGet();
- if (num == 0) {
- expected.notify();
+ public void operationComplete(int rc, final Long ledgerId) {
+ if (BKException.Code.OK != rc) {
+ synchronized (expected) {
+ int num = expected.decrementAndGet();
+ if (num == 0) {
+ expected.notify();
+ }
}
+ return;
}
+
+ getLedgerManager().createLedgerMetadata(ledgerId,
+ new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ if (rc == BKException.Code.OK) {
+ activeLedgers.put(ledgerId, true);
+ createdLedgers.add(ledgerId);
+ }
+ synchronized (expected) {
+ int num = expected.decrementAndGet();
+ if (num == 0) {
+ expected.notify();
+ }
+ }
+ }
+ });
}
});
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index b95d2db..dae61bc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -21,20 +21,19 @@
package org.apache.bookkeeper.meta;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.SnapshotMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test case to run over serveral ledger managers
@@ -45,6 +44,7 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
LedgerManagerFactory ledgerManagerFactory;
LedgerManager ledgerManager = null;
+ LedgerIdGenerator ledgerIdGenerator = null;
SnapshotMap<Long, Boolean> activeLedgers = null;
public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
@@ -60,6 +60,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
return ledgerManager;
}
+ public LedgerIdGenerator getLedgerIdGenerator() throws IOException {
+ if (null == ledgerIdGenerator) {
+ ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+ }
+ return ledgerIdGenerator;
+ }
+
@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {