You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/06/19 12:39:38 UTC
svn commit: r1351646 [1/3] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ book...
Author: ivank
Date: Tue Jun 19 10:39:37 2012
New Revision: 1351646
URL: http://svn.apache.org/viewvc?rev=1351646&view=rev
Log:
BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on metadata operations. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java
- copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java
- copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
- copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
Removed:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalAsyncLedgerOpsTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieFailureTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieReadWriteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jun 19 10:39:37 2012
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
IMPROVEMENTS:
+ BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on metadata operations. (sijie via ivank)
+
Backward compatible changes:
BUGFIXES:
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jun 19 10:39:37 2012
@@ -36,7 +36,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -66,9 +66,11 @@ public class Bookie extends Thread {
final ServerConfiguration conf;
final SyncThread syncThread;
- final LedgerManager ledgerManager;
+ final LedgerManagerFactory activeLedgerManagerFactory;
+ final ActiveLedgerManager activeLedgerManager;
final LedgerStorage ledgerStorage;
final Journal journal;
+
final HandleFactory handles;
static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
@@ -351,10 +353,11 @@ public class Bookie extends Thread {
this.zk = instantiateZookeeperClient(conf);
checkEnvironment(this.zk);
- ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
+ activeLedgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+ activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
syncThread = new SyncThread(conf);
- ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
+ ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager);
handles = new HandleFactoryImpl(ledgerStorage);
// instantiate the journal
journal = new Journal(conf);
@@ -603,7 +606,12 @@ public class Bookie extends Thread {
syncThread.shutdown();
// close Ledger Manager
- ledgerManager.close();
+ try {
+ activeLedgerManager.close();
+ activeLedgerManagerFactory.uninitialize();
+ } catch (IOException ie) {
+ LOG.error("Failed to close active ledger manager : ", ie);
+ }
// setting running to false here, so watch thread in bookie server know it only after bookie shut down
running = false;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Jun 19 10:39:37 2012
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.zookeeper.ZooKeeper;
/**
@@ -73,7 +73,7 @@ public class GarbageCollectorThread exte
// Ledger Cache Handle
final LedgerCache ledgerCache;
- final LedgerManager ledgerManager;
+ final ActiveLedgerManager activeLedgerManager;
// flag to ensure gc thread will not be interrupted during compaction
// to reduce the risk getting entry log corrupted
@@ -117,14 +117,14 @@ public class GarbageCollectorThread exte
public GarbageCollectorThread(ServerConfiguration conf,
LedgerCache ledgerCache,
EntryLogger entryLogger,
- LedgerManager ledgerManager,
+ ActiveLedgerManager activeLedgerManager,
EntryLogScanner scanner)
throws IOException {
super("GarbageCollectorThread");
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
- this.ledgerManager = ledgerManager;
+ this.activeLedgerManager = activeLedgerManager;
this.scanner = scanner;
this.gcWaitTime = conf.getGcWaitTime();
@@ -224,8 +224,8 @@ public class GarbageCollectorThread exte
* Do garbage collection ledger index files
*/
private void doGcLedgers() {
- ledgerManager.garbageCollectLedgers(
- new LedgerManager.GarbageCollector() {
+ activeLedgerManager.garbageCollectLedgers(
+ new ActiveLedgerManager.GarbageCollector() {
@Override
public void gc(long ledgerId) {
try {
@@ -246,7 +246,7 @@ public class GarbageCollectorThread exte
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
- if (!ledgerManager.containsActiveLedger(entryLogLedger)) {
+ if (!activeLedgerManager.containsActiveLedger(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Jun 19 10:39:37 2012
@@ -26,7 +26,7 @@ import java.io.IOException;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.slf4j.Logger;
@@ -50,12 +50,12 @@ class InterleavedLedgerStorage implement
// this indicates that a write has happened since the last flush
private volatile boolean somethingWritten = false;
- InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager)
+ InterleavedLedgerStorage(ServerConfiguration conf, ActiveLedgerManager activeLedgerManager)
throws IOException {
entryLogger = new EntryLogger(conf);
- ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
+ ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
- ledgerManager, new EntryLogCompactionScanner());
+ activeLedgerManager, new EntryLogCompactionScanner());
}
@Override
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Tue Jun 19 10:39:37 2012
@@ -35,7 +35,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Map.Entry;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public class LedgerCacheImpl implements
final File ledgerDirectories[];
- public LedgerCacheImpl(ServerConfiguration conf, LedgerManager alm) {
+ public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm) {
this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
this.openFileLimit = conf.getOpenFileLimit();
this.pageSize = conf.getPageSize();
@@ -83,7 +83,7 @@ public class LedgerCacheImpl implements
// Manage all active ledgers in LedgerManager
// so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
- final LedgerManager activeLedgerManager;
+ final ActiveLedgerManager activeLedgerManager;
final int openFileLimit;
final int pageSize;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java Tue Jun 19 10:39:37 2012
@@ -76,6 +76,8 @@ public abstract class BKException extend
return new BKInterruptedException();
case Code.ProtocolVersionException:
return new BKProtocolVersionException();
+ case Code.MetadataVersionException:
+ return new BKMetadataVersionException();
case Code.LedgerFencedException:
return new BKLedgerFencedException();
case Code.UnauthorizedAccessException:
@@ -107,6 +109,7 @@ public abstract class BKException extend
int IncorrectParameterException = -14;
int InterruptedException = -15;
int ProtocolVersionException = -16;
+ int MetadataVersionException = -17;
int IllegalOpException = -100;
int LedgerFencedException = -101;
@@ -157,6 +160,8 @@ public abstract class BKException extend
return "Interrupted while waiting for permit";
case Code.ProtocolVersionException:
return "Bookie protocol version on server is incompatible with client";
+ case Code.MetadataVersionException:
+ return "Bad ledger metadata version";
case Code.LedgerFencedException:
return "Ledger has been fenced off. Some other client must have opened it to read";
case Code.UnauthorizedAccessException:
@@ -226,6 +231,12 @@ public abstract class BKException extend
}
}
+ public static class BKMetadataVersionException extends BKException {
+ public BKMetadataVersionException() {
+ super(Code.MetadataVersionException);
+ }
+ }
+
public static class BKNoSuchLedgerExistsException extends BKException {
public BKNoSuchLedgerExistsException() {
super(Code.NoSuchLedgerExistsException);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jun 19 10:39:37 2012
@@ -81,6 +81,7 @@ public class BookKeeper {
final OrderedSafeExecutor mainWorkerPool;
// Ledger manager responsible for how to store ledger meta data
+ final LedgerManagerFactory ledgerManagerFactory;
final LedgerManager ledgerManager;
final ClientConfiguration conf;
@@ -148,7 +149,8 @@ public class BookKeeper {
bookieWatcher = new BookieWatcher(conf, this);
bookieWatcher.readBookiesBlocking();
- ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+ ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+ ledgerManager = ledgerManagerFactory.newLedgerManager();
ownChannelFactory = true;
ownZKHandle = true;
@@ -210,7 +212,8 @@ public class BookKeeper {
bookieWatcher = new BookieWatcher(conf, this);
bookieWatcher.readBookiesBlocking();
- ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+ ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+ ledgerManager = ledgerManagerFactory.newLedgerManager();
}
LedgerManager getLedgerManager() {
@@ -494,11 +497,8 @@ public class BookKeeper {
asyncDeleteLedger(lId, new SyncDeleteCallback(), counter);
// Wait
counter.block(0);
- if (counter.getrc() == KeeperException.Code.NONODE.intValue()) {
- LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", lId);
- throw BKException.create(Code.NoSuchLedgerExistsException);
- } else if (counter.getrc() != KeeperException.Code.OK.intValue()) {
- LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
+ if (counter.getrc() != BKException.Code.OK) {
+ LOG.error("Error deleting ledger " + lId + " : " + counter.getrc());
throw BKException.create(Code.ZKException);
}
}
@@ -509,7 +509,12 @@ public class BookKeeper {
*/
public void close() throws InterruptedException, BKException {
bookieClient.close();
- ledgerManager.close();
+ try {
+ ledgerManager.close();
+ ledgerManagerFactory.uninitialize();
+ } catch (IOException ie) {
+ LOG.error("Failed to close ledger manager : ", ie);
+ }
bookieWatcher.halt();
if (ownChannelFactory) {
channelFactory.releaseExternalResources();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Tue Jun 19 10:39:37 2012
@@ -752,14 +752,13 @@ public class BookKeeperAdmin {
int deadBookieIndex = ensemble.indexOf(oldBookie);
ensemble.remove(deadBookieIndex);
ensemble.add(deadBookieIndex, newBookie);
-
- lh.writeLedgerConfig(new WriteCb(), null);
+ lh.writeLedgerConfig(new WriteCb());
}
- private class WriteCb implements AsyncCallback.StatCallback {
+ private class WriteCb implements GenericCallback<Void> {
@Override
- public void processResult(int rc, final String path, Object ctx, Stat stat) {
- if (rc == Code.BADVERSION.intValue()) {
+ public void operationComplete(int rc, Void result) {
+ if (rc == BKException.Code.MetadataVersionException) {
LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId()
+ " startid: " + fragmentStartId);
// try again, the previous success (with which this has conflicted)
@@ -769,8 +768,7 @@ public class BookKeeperAdmin {
@Override
public void operationComplete(int rc, LedgerMetadata newMeta) {
if (rc != BKException.Code.OK) {
- LOG.error("Error reading updated ledger metadata for ledger " + lh.getId(),
- KeeperException.create(KeeperException.Code.get(rc), path));
+ LOG.error("Error reading updated ledger metadata for ledger " + lh.getId());
ledgerFragmentsMcb.processResult(rc, null, null);
} else {
lh.metadata = newMeta;
@@ -779,11 +777,10 @@ public class BookKeeperAdmin {
}
});
return;
- } else if (rc != Code.OK.intValue()) {
- LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
- KeeperException.create(KeeperException.Code.get(rc), path));
+ } else if (rc != BKException.Code.OK) {
+ LOG.error("Error updating ledger config metadata for ledgerId " + lh.getId() + " : "
+ + BKException.getMessage(rc));
} else {
- lh.getLedgerMetadata().updateZnodeStatus(stat);
LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : " + fragmentStartId
+ ") to point ledger fragments from old dead bookie: (" + oldBookie
+ ") to new bookie: (" + newBookie + ")");
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Tue Jun 19 10:39:37 2012
@@ -25,20 +25,20 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
/**
* Encapsulates asynchronous ledger create operation
*
*/
-class LedgerCreateOp implements GenericCallback<String> {
+class LedgerCreateOp implements GenericCallback<Long> {
static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
@@ -102,32 +102,17 @@ class LedgerCreateOp implements GenericC
*/
metadata.addEnsemble(0L, ensemble);
- // create a ledger path with metadata
- bk.getLedgerManager().newLedgerPath(this, metadata);
+ // create a ledger with metadata
+ bk.getLedgerManager().createLedger(metadata, this);
}
/**
- * Callback when created ledger path.
+ * Callback when created ledger.
*/
@Override
- public void operationComplete(int rc, String ledgerPath) {
-
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not create node for ledger",
- KeeperException.create(KeeperException.Code.get(rc), ledgerPath));
- cb.createComplete(BKException.Code.ZKException, null, this.ctx);
- return;
- }
-
- /*
- * Extract ledger id.
- */
- long ledgerId;
- try {
- ledgerId = bk.getLedgerManager().getLedgerId(ledgerPath);
- } catch (IOException e) {
- LOG.error("Could not extract ledger-id from path:" + ledgerPath, e);
- cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+ public void operationComplete(int rc, Long ledgerId) {
+ if (BKException.Code.OK != rc) {
+ cb.createComplete(rc, null, this.ctx);
return;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Tue Jun 19 10:39:37 2012
@@ -22,15 +22,15 @@
package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
/**
* Encapsulates asynchronous ledger delete operation
*
*/
-class LedgerDeleteOp implements VoidCallback {
+class LedgerDeleteOp implements GenericCallback<Void> {
static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
@@ -62,19 +62,15 @@ class LedgerDeleteOp implements VoidCall
* Initiates the operation
*/
public void initiate() {
- // Asynchronously delete the ledger node in ZK.
+ // Asynchronously delete the ledger from meta manager
// When this completes, it will invoke the callback method below.
-
- bk.getZkHandle().delete(bk.getLedgerManager().getLedgerPath(ledgerId), -1, this, null);
+ bk.getLedgerManager().deleteLedger(ledgerId, this);
}
/**
- * Implements ZooKeeper Void Callback.
- *
- * @see org.apache.zookeeper.AsyncCallback.VoidCallback#processResult(int,
- * java.lang.String, java.lang.Object)
+ * Implements Delete Callback.
*/
- public void processResult(int rc, String path, Object ctx) {
+ public void operationComplete(int rc, Void result) {
cb.deleteComplete(rc, this.ctx);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Tue Jun 19 10:39:37 2012
@@ -46,10 +46,6 @@ import org.apache.bookkeeper.util.SafeRu
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
import org.jboss.netty.buffer.ChannelBuffer;
/**
@@ -205,14 +201,12 @@ public class LedgerHandle {
return distributionSchedule;
}
- void writeLedgerConfig(StatCallback callback, Object ctx) {
+ void writeLedgerConfig(GenericCallback<Void> writeCb) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing metadata to ZooKeeper: " + this.ledgerId + ", " + metadata.getZnodeVersion());
+ LOG.debug("Writing metadata to ledger manager: " + this.ledgerId + ", " + metadata.getVersion());
}
- bk.getZkHandle().setData(bk.getLedgerManager().getLedgerPath(ledgerId),
- metadata.serialize(), metadata.getZnodeVersion(),
- callback, ctx);
+ bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
}
/**
@@ -286,48 +280,44 @@ public class LedgerHandle {
+ metadata.close + " with this many bytes: " + metadata.length);
}
- final class CloseCb implements StatCallback {
+ final class CloseCb implements GenericCallback<Void> {
@Override
- public void processResult(final int rc, String path, Object subctx,
- final Stat stat) {
- if (rc == KeeperException.Code.BadVersion) {
+ public void operationComplete(final int rc, Void result) {
+ if (rc == BKException.Code.MetadataVersionException) {
rereadMetadata(new GenericCallback<LedgerMetadata>() {
@Override
public void operationComplete(int newrc, LedgerMetadata newMeta) {
if (newrc != BKException.Code.OK) {
LOG.error("Error reading new metadata from ledger " + ledgerId
+ " when closing, code=" + newrc);
- cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+ cb.closeComplete(rc, LedgerHandle.this, ctx);
} else {
metadata.close(prevClose);
metadata.length = prevLength;
if (metadata.resolveConflict(newMeta)) {
metadata.length = length;
metadata.close(lastAddConfirmed);
- writeLedgerConfig(new CloseCb(), null);
+ writeLedgerConfig(new CloseCb());
return;
} else {
metadata.length = length;
metadata.close(lastAddConfirmed);
- LOG.warn("Conditional write failed: "
- + KeeperException.Code.get(KeeperException.Code.BadVersion));
- cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+ LOG.warn("Conditional update ledger metadata for ledger " + ledgerId + " failed.");
+ cb.closeComplete(rc, LedgerHandle.this, ctx);
}
}
}
});
- } else if (rc != KeeperException.Code.OK.intValue()) {
- LOG.warn("Conditional write failed: " + KeeperException.Code.get(rc));
- cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
- ctx);
+ } else if (rc != BKException.Code.OK) {
+ LOG.error("Error update ledger metadata for ledger " + ledgerId + " : " + rc);
+ cb.closeComplete(rc, LedgerHandle.this, ctx);
} else {
- metadata.updateZnodeStatus(stat);
cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
}
}
};
- writeLedgerConfig(new CloseCb(), null);
+ writeLedgerConfig(new CloseCb());
}
});
@@ -687,20 +677,20 @@ public class LedgerHandle {
final long newEnsembleStartEntry = lastAddConfirmed + 1;
metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
- final class ChangeEnsembleCb implements StatCallback {
+ final class ChangeEnsembleCb implements GenericCallback<Void> {
@Override
- public void processResult(final int rc, String path, Object ctx, final Stat stat) {
+ public void operationComplete(final int rc, Void result) {
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
- if (rc == KeeperException.Code.BadVersion) {
+ if (rc == BKException.Code.MetadataVersionException) {
rereadMetadata(new GenericCallback<LedgerMetadata>() {
@Override
public void operationComplete(int newrc, LedgerMetadata newMeta) {
if (newrc != BKException.Code.OK) {
LOG.error("Error reading new metadata from ledger after changing ensemble, code=" + newrc);
- handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ handleUnrecoverableErrorDuringAdd(rc);
} else {
// a new ensemble is added only when the start entry is larger than zero
if (newEnsembleStartEntry > 0) {
@@ -708,27 +698,25 @@ public class LedgerHandle {
}
if (metadata.resolveConflict(newMeta)) {
metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
- writeLedgerConfig(new ChangeEnsembleCb(), null);
+ writeLedgerConfig(new ChangeEnsembleCb());
return;
} else {
LOG.error("Could not resolve ledger metadata conflict while changing ensemble to: "
+ newEnsemble + ", old meta data is \n" + new String(metadata.serialize())
+ "\n, new meta data is \n" + new String(newMeta.serialize()) + "\n ,closing ledger");
- handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ handleUnrecoverableErrorDuringAdd(rc);
}
}
}
});
return;
- } else if (rc != KeeperException.Code.OK.intValue()) {
- LOG
- .error("Could not persist ledger metadata while changing ensemble to: "
- + newEnsemble + " , closing ledger");
- handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ } else if (rc != BKException.Code.OK) {
+ LOG.error("Could not persist ledger metadata while changing ensemble to: "
+ + newEnsemble + " , closing ledger");
+ handleUnrecoverableErrorDuringAdd(rc);
return;
}
- metadata.updateZnodeStatus(stat);
for (PendingAddOp pendingAddOp : pendingAddOps) {
pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
}
@@ -738,31 +726,11 @@ public class LedgerHandle {
}
};
- writeLedgerConfig(new ChangeEnsembleCb(), null);
-
+ writeLedgerConfig(new ChangeEnsembleCb());
}
void rereadMetadata(final GenericCallback<LedgerMetadata> cb) {
- bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
- new DataCallback() {
- public void processResult(int rc, String path,
- Object ctx, byte[] data, Stat stat) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Error reading metadata from ledger, code =" + rc);
- cb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
-
- try {
- LedgerMetadata newMeta = LedgerMetadata.parseConfig(data, stat.getVersion());
- cb.operationComplete(BKException.Code.OK, newMeta);
- } catch (IOException e) {
- LOG.error("Error parsing ledger metadata for ledger", e);
- cb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
- }
- }, null);
+ bk.getLedgerManager().readLedgerMetadata(ledgerId, cb);
}
synchronized void recover(final GenericCallback<Void> cb) {
@@ -784,31 +752,29 @@ public class LedgerHandle {
metadata.markLedgerInRecovery();
- writeLedgerConfig(new StatCallback() {
+ writeLedgerConfig(new GenericCallback<Void>() {
@Override
- public void processResult(final int rc, String path, Object ctx, Stat stat) {
- if (rc == KeeperException.Code.BadVersion) {
+ public void operationComplete(final int rc, Void result) {
+ if (rc == BKException.Code.MetadataVersionException) {
rereadMetadata(new GenericCallback<LedgerMetadata>() {
- @Override
- public void operationComplete(int rc, LedgerMetadata newMeta) {
- if (rc != BKException.Code.OK) {
- cb.operationComplete(rc, null);
- } else {
- metadata = newMeta;
- recover(cb);
- }
+ @Override
+ public void operationComplete(int rc, LedgerMetadata newMeta) {
+ if (rc != BKException.Code.OK) {
+ cb.operationComplete(rc, null);
+ } else {
+ metadata = newMeta;
+ recover(cb);
}
- });
- } else if (rc == KeeperException.Code.OK.intValue()) {
- metadata.znodeVersion = stat.getVersion();
+ }
+ });
+ } else if (rc == BKException.Code.OK) {
new LedgerRecoveryOp(LedgerHandle.this, cb).initiate();
} else {
- LOG.error("Error writing ledger config " + rc
- + " path = " + path);
- cb.operationComplete(BKException.Code.ZKException, null);
+ LOG.error("Error writing ledger config " + rc + " of ledger " + ledgerId);
+ cb.operationComplete(rc, null);
}
}
- }, null);
+ });
}
static class NoopCloseCallback implements CloseCallback {
@@ -816,7 +782,7 @@ public class LedgerHandle {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- if (rc != KeeperException.Code.OK.intValue()) {
+ if (rc != BKException.Code.OK) {
LOG.warn("Close failed: " + BKException.getMessage(rc));
}
// noop
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Tue Jun 19 10:39:37 2012
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -60,8 +61,8 @@ public class LedgerMetadata {
long close;
private SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = new TreeMap<Long, ArrayList<InetSocketAddress>>();
ArrayList<InetSocketAddress> currentEnsemble;
- volatile int znodeVersion = -1;
-
+ volatile Version version = null;
+
public LedgerMetadata(int ensembleSize, int quorumSize) {
this.ensembleSize = ensembleSize;
this.quorumSize = quorumSize;
@@ -171,12 +172,13 @@ public class LedgerMetadata {
*
* @param array
* byte array to parse
+ * @param version
+ * version of the ledger metadata
* @return LedgerConfig
* @throws IOException
* if the given byte[] cannot be parsed
*/
-
- static LedgerMetadata parseConfig(byte[] bytes, int version) throws IOException {
+ public static LedgerMetadata parseConfig(byte[] bytes, Version version) throws IOException {
LedgerMetadata lc = new LedgerMetadata();
String config = new String(bytes);
@@ -207,7 +209,7 @@ public class LedgerMetadata {
throw new IOException("Quorum size or ensemble size absent from config: " + config);
}
- lc.znodeVersion = version;
+ lc.version = version;
lc.quorumSize = new Integer(lines[i++]);
lc.ensembleSize = new Integer(lines[i++]);
lc.length = new Long(lines[i++]);
@@ -234,31 +236,21 @@ public class LedgerMetadata {
/**
- * Updates the status of this metadata in ZooKeeper.
+ * Updates the version of this metadata.
*
- * @param stat
- */
- public void updateZnodeStatus(Stat stat) {
- this.znodeVersion = stat.getVersion();
- }
-
- /**
- * Update the znode version of this metadata
- *
- * @param znodeVersion
- * Znode version of this metadata
+ * @param v Version
*/
- public void updateZnodeStatus(int znodeVersion) {
- this.znodeVersion = znodeVersion;
+ public void setVersion(Version v) {
+ this.version = v;
}
/**
- * Returns the last znode version.
+ * Returns the last version.
*
- * @return int znode version
+ * @return version
*/
- public int getZnodeVersion() {
- return this.znodeVersion;
+ public Version getVersion() {
+ return this.version;
}
/**
@@ -282,7 +274,8 @@ public class LedgerMetadata {
return false;
}
// new meta znode version should be larger than old one
- if (znodeVersion > newMeta.znodeVersion) {
+ if (null != version &&
+ Version.Occurred.AFTER == version.compare(newMeta.version)) {
return false;
}
// ensemble size should be same
@@ -306,7 +299,7 @@ public class LedgerMetadata {
* ensemble and znode version
*/
ensembles = newMeta.ensembles;
- znodeVersion = newMeta.znodeVersion;
+ version = newMeta.version;
return true;
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Tue Jun 19 10:39:37 2012
@@ -23,21 +23,20 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.security.GeneralSecurityException;
+
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.zookeeper.data.Stat;
/**
* Encapsulates the ledger open operation
*
*/
-class LedgerOpenOp implements DataCallback {
+class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
static final Logger LOG = LoggerFactory.getLogger(LedgerOpenOp.class);
final BookKeeper bk;
@@ -78,8 +77,7 @@ class LedgerOpenOp implements DataCallba
/**
* Asynchronously read the ledger metadata node.
*/
-
- bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false, this, ctx);
+ bk.getLedgerManager().readLedgerMetadata(ledgerId, this);
}
/**
@@ -91,34 +89,15 @@ class LedgerOpenOp implements DataCallba
}
/**
- * Implements ZooKeeper data callback.
- * @see org.apache.zookeeper.AsyncCallback.DataCallback#processResult(int, String, Object, byte[], Stat)
+ * Implements Open Ledger Callback.
*/
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-
- if (rc == KeeperException.Code.NONODE.intValue()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No such ledger: " + ledgerId, KeeperException.create(KeeperException.Code.get(rc), path));
- }
- cb.openComplete(BKException.Code.NoSuchLedgerExistsException, null, this.ctx);
+ public void operationComplete(int rc, LedgerMetadata metadata) {
+ if (BKException.Code.OK != rc) {
+ // open ledger failed.
+ cb.openComplete(rc, null, this.ctx);
return;
}
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Could not read metadata for ledger: " + ledgerId, KeeperException.create(KeeperException.Code
- .get(rc), path));
- cb.openComplete(BKException.Code.ZKException, null, this.ctx);
- return;
- }
-
- LedgerMetadata metadata;
- try {
- metadata = LedgerMetadata.parseConfig(data, stat.getVersion());
- } catch (IOException e) {
- LOG.error("Could not parse ledger metadata for ledger: " + ledgerId, e);
- cb.openComplete(BKException.Code.ZKException, null, this.ctx);
- return;
- }
-
+ // get the ledger metadata back
try {
lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd);
} catch (GeneralSecurityException e) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Tue Jun 19 10:39:37 2012
@@ -26,14 +26,29 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.util.ReflectionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract configuration
*/
public abstract class AbstractConfiguration extends CompositeConfiguration {
+ static final Logger LOG = LoggerFactory.getLogger(AbstractConfiguration.class);
+
+ private static ClassLoader defaultLoader;
+ static {
+ defaultLoader = Thread.currentThread().getContextClassLoader();
+ if (null == defaultLoader) {
+ defaultLoader = AbstractConfiguration.class.getClassLoader();
+ }
+ }
+
// Ledger Manager
protected final static String LEDGER_MANAGER_TYPE = "ledgerManagerType";
+ protected final static String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass";
protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
protected final static String AVAILABLE_NODE = "available";
protected AbstractConfiguration() {
@@ -79,7 +94,10 @@ public abstract class AbstractConfigurat
*
* @param lmType
* Ledger Manager Type
+ * @return void
+ * @deprecated replaced by {@link #setLedgerManagerFactoryClass()}
*/
+ @Deprecated
public void setLedgerManagerType(String lmType) {
setProperty(LEDGER_MANAGER_TYPE, lmType);
}
@@ -89,12 +107,48 @@ public abstract class AbstractConfigurat
*
* @return ledger manager type
* @throws ConfigurationException
+ * @deprecated replaced by {@link #getLedgerManagerFactoryClass()}
*/
+ @Deprecated
public String getLedgerManagerType() {
return getString(LEDGER_MANAGER_TYPE);
}
/**
+ * Set Ledger Manager Factory Class Name.
+ *
+ * @param factoryClassName
+ * Ledger Manager Factory Class Name
+ * @return void
+ */
+ public void setLedgerManagerFactoryClassName(String factoryClassName) {
+ setProperty(LEDGER_MANAGER_FACTORY_CLASS, factoryClassName);
+ }
+
+ /**
+ * Set Ledger Manager Factory Class.
+ *
+ * @param factoryClass
+ * Ledger Manager Factory Class
+ * @return void
+ */
+ public void setLedgerManagerFactoryClass(Class<? extends LedgerManagerFactory> factoryClass) {
+ setProperty(LEDGER_MANAGER_FACTORY_CLASS, factoryClass.getName());
+ }
+
+ /**
+ * Get ledger manager factory class.
+ *
+ * @return ledger manager factory class
+ */
+ public Class<? extends LedgerManagerFactory> getLedgerManagerFactoryClass()
+ throws ConfigurationException {
+ return ReflectionUtils.getClass(this, LEDGER_MANAGER_FACTORY_CLASS,
+ null, LedgerManagerFactory.class,
+ defaultLoader);
+ }
+
+ /**
* Set Zk Ledgers Root Path.
*
* @param zkLedgersPath zk ledgers root path
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -25,22 +25,29 @@ import java.util.Set;
import java.util.Map;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
/**
* Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
*/
-abstract class AbstractZkLedgerManager implements LedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager, ActiveLedgerManager {
static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
@@ -53,6 +60,9 @@ abstract class AbstractZkLedgerManager i
protected final ZooKeeper zk;
protected final String ledgerRootPath;
+ // A sorted map to stored all active ledger ids
+ protected final SnapshotMap<Long, Boolean> activeLedgers;
+
/**
* ZooKeeper-based Ledger Manager Constructor
*
@@ -60,14 +70,112 @@ abstract class AbstractZkLedgerManager i
* Configuration object
* @param zk
* ZooKeeper Client Handle
- * @param ledgerRootPath
- * ZooKeeper Path to store ledger metadata
*/
- protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
- String ledgerRootPath) {
+ protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
this.conf = conf;
this.zk = zk;
- this.ledgerRootPath = ledgerRootPath;
+ this.ledgerRootPath = conf.getZkLedgersRootPath();
+
+ activeLedgers = new SnapshotMap<Long, Boolean>();
+ }
+
+ /**
+ * Get the znode path that is used to store ledger metadata
+ *
+ * @param ledgerId
+ * Ledger ID
+ * @return ledger node path
+ */
+ protected abstract String getLedgerPath(long ledgerId);
+
+ /**
+ * Get ledger id from its znode ledger path
+ *
+ * @param ledgerPath
+ * Ledger path to store metadata
+ * @return ledger id
+ * @throws IOException when the ledger path is invalid
+ */
+ protected abstract long getLedgerId(String ledgerPath) throws IOException;
+
+ @Override
+ public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+ zk.delete(getLedgerPath(ledgerId), -1, new VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ int bkRc;
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
+ bkRc = BKException.Code.NoSuchLedgerExistsException;
+ } else if (rc == KeeperException.Code.OK.intValue()) {
+ bkRc = BKException.Code.OK;
+ } else {
+ bkRc = BKException.Code.ZKException;
+ }
+ cb.operationComplete(bkRc, (Void)null);
+ }
+ }, null);
+ }
+
+ @Override
+ public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata> readCb) {
+ zk.getData(getLedgerPath(ledgerId), false, new DataCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such ledger: " + ledgerId,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ }
+ readCb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+ return;
+ }
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Could not read metadata for ledger: " + ledgerId,
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ readCb.operationComplete(BKException.Code.ZKException, null);
+ return;
+ }
+
+ LedgerMetadata metadata;
+ try {
+ metadata = LedgerMetadata.parseConfig(data, new ZkVersion(stat.getVersion()));
+ } catch (IOException e) {
+ LOG.error("Could not parse ledger metadata for ledger: " + ledgerId, e);
+ readCb.operationComplete(BKException.Code.ZKException, null);
+ return;
+ }
+ readCb.operationComplete(BKException.Code.OK, metadata);
+ }
+ }, null);
+ }
+
+ @Override
+ public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+ final GenericCallback<Void> cb) {
+ Version v = metadata.getVersion();
+ if (null == v || !(v instanceof ZkVersion)) {
+ cb.operationComplete(BKException.Code.MetadataVersionException, null);
+ return;
+ }
+ final ZkVersion zv = (ZkVersion) v;
+ zk.setData(getLedgerPath(ledgerId),
+ metadata.serialize(), zv.getZnodeVersion(),
+ new StatCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ if (KeeperException.Code.BadVersion == rc) {
+ cb.operationComplete(BKException.Code.MetadataVersionException, null);
+ } else if (KeeperException.Code.OK.intValue() == rc) {
+ // update metadata version
+ metadata.setVersion(zv.setZnodeVersion(stat.getVersion()));
+ cb.operationComplete(BKException.Code.OK, null);
+ } else {
+ LOG.warn("Conditional update ledger metadata failed: ", KeeperException.Code.get(rc));
+ cb.operationComplete(BKException.Code.ZKException, null);
+ }
+ }
+ }, null);
}
/**
@@ -260,6 +368,21 @@ abstract class AbstractZkLedgerManager i
public void close() {
}
+ @Override
+ public void addActiveLedger(long ledgerId, boolean active) {
+ activeLedgers.put(ledgerId, active);
+ }
+
+ @Override
+ public void removeActiveLedger(long ledgerId) {
+ activeLedgers.remove(ledgerId);
+ }
+
+ @Override
+ public boolean containsActiveLedger(long ledgerId) {
+ return activeLedgers.containsKey(ledgerId);
+ }
+
/**
* Do garbage collecting comparing hosted ledgers and zk ledgers
*
Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java (from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java&p1=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java&r1=1351374&r2=1351646&rev=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -18,76 +18,17 @@ package org.apache.bookkeeper.meta;
* limitations under the License.
*/
-import java.io.IOException;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import java.io.Closeable;
/**
- * LedgerManager takes responsibility of ledger management
+ * ActiveLedgerManager takes responsibility of active ledger management in bookie side.
*
* <ul>
- * <li>How to store ledger meta (e.g. in ZooKeeper or other key/value store)
* <li>How to manager active ledgers (so know how to do garbage collection)
* <li>How to garbage collect inactive/deleted ledgers
* </ul>
*/
-public interface LedgerManager {
-
- /**
- * Get the path that is used to store ledger metadata
- *
- * @param ledgerId
- * Ledger ID
- * @return ledger node path
- */
- public String getLedgerPath(long ledgerId);
-
- /**
- * Get ledger id from its ledger path
- *
- * @param ledgerPath
- * Ledger path to store metadata
- * @return ledger id
- * @throws IOException when the ledger path is invalid
- */
- public long getLedgerId(String ledgerPath) throws IOException;
-
- /**
- * Create a new zk ledger path with provided metadata
- *
- * @param cb
- * Callback when getting new zk ledger path to create.
- * @param metadata
- * Metadata provided when creating a new ledger
- */
- public abstract void newLedgerPath(GenericCallback<String> cb, LedgerMetadata metadata);
-
- /**
- * Loop to process all ledgers.
- * <p>
- * <ul>
- * After all ledgers were processed, finalCb will be triggerred:
- * <li> if all ledgers are processed done with OK, success rc will be passed to finalCb.
- * <li> if some ledgers are prcoessed failed, failure rc will be passed to finalCb.
- * </ul>
- * </p>
- *
- * @param processor
- * Ledger Processor to process a specific ledger
- * @param finalCb
- * Callback triggered after all ledgers are processed
- * @param context
- * Context of final callback
- * @param successRc
- * Success RC code passed to finalCb when callback
- * @param failureRc
- * Failure RC code passed to finalCb when exceptions occured.
- */
- public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
- Object context, int successRc, int failureRc);
+public interface ActiveLedgerManager extends Closeable {
/**
* Add active ledger
@@ -140,8 +81,4 @@ public interface LedgerManager {
*/
public void garbageCollectLedgers(GarbageCollector gc);
- /**
- * Close ledger manager
- */
- public void close();
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -19,10 +19,10 @@ package org.apache.bookkeeper.meta;
*/
import java.io.IOException;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.util.ZkUtil
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
@@ -61,13 +62,8 @@ import org.slf4j.LoggerFactory;
class FlatLedgerManager extends AbstractZkLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(FlatLedgerManager.class);
- public static final String NAME = "flat";
- public static final int CUR_VERSION = 1;
-
// path prefix to store ledger znodes
private final String ledgerPrefix;
- // hash map to store all active ledger ids
- private SnapshotMap<Long, Boolean> activeLedgers;
/**
* Constructor
@@ -80,32 +76,32 @@ class FlatLedgerManager extends Abstract
* ZooKeeper Path to store ledger metadata
* @throws IOException when version is not compatible
*/
- public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
- String ledgerRootPath, int layoutVersion)
- throws IOException {
- super(conf, zk, ledgerRootPath);
-
- if (layoutVersion != CUR_VERSION) {
- throw new IOException("Incompatible layout version found : "
- + layoutVersion);
- }
+ public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
ledgerPrefix = ledgerRootPath + "/" + LEDGER_NODE_PREFIX;
- activeLedgers = new SnapshotMap<Long, Boolean>();
}
@Override
- public void newLedgerPath(final GenericCallback<String> cb, final LedgerMetadata metadata) {
+ public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) {
StringCallback scb = new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
String name) {
if (Code.OK.intValue() != rc) {
+ LOG.error("Could not create node for ledger",
+ KeeperException.create(KeeperException.Code.get(rc), path));
cb.operationComplete(rc, null);
} else {
// update znode status
- metadata.updateZnodeStatus(0);
- cb.operationComplete(rc, name);
+ metadata.setVersion(new ZkVersion(0));
+ try {
+ long ledgerId = getLedgerId(name);
+ cb.operationComplete(rc, ledgerId);
+ } catch (IOException ie) {
+ LOG.error("Could not extract ledger-id from path:" + name, ie);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ }
}
}
};
@@ -141,21 +137,6 @@ class FlatLedgerManager extends Abstract
}
@Override
- public void addActiveLedger(long ledgerId, boolean active) {
- activeLedgers.put(ledgerId, active);
- }
-
- @Override
- public void removeActiveLedger(long ledgerId) {
- activeLedgers.remove(ledgerId);
- }
-
- @Override
- public boolean containsActiveLedger(long ledgerId) {
- return activeLedgers.containsKey(ledgerId);
- }
-
- @Override
public void garbageCollectLedgers(GarbageCollector gc) {
try {
// create a snapshot first
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,72 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Flat Ledger Manager Factory
+ */
+class FlatLedgerManagerFactory extends LedgerManagerFactory {
+
+ public static final String NAME = "flat";
+ public static final int CUR_VERSION = 1;
+
+ AbstractConfiguration conf;
+ ZooKeeper zk;
+
+ @Override
+ public int getCurrentVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+ final ZooKeeper zk,
+ final int factoryVersion)
+ throws IOException {
+ if (CUR_VERSION != factoryVersion) {
+ throw new IOException("Incompatible layout version found : "
+ + factoryVersion);
+ }
+ this.conf = conf;
+ this.zk = zk;
+ return this;
+ }
+
+ @Override
+ public void uninitialize() throws IOException {
+ // since zookeeper instance is passed from outside
+ // we don't need to close it here
+ }
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new FlatLedgerManager(conf, zk);
+ }
+
+ @Override
+ public ActiveLedgerManager newActiveLedgerManager() {
+ return new FlatLedgerManager(conf, zk);
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.Set;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.HashSet;
import java.util.List;
import org.apache.bookkeeper.client.LedgerMetadata;
@@ -81,9 +80,6 @@ import org.slf4j.LoggerFactory;
class HierarchicalLedgerManager extends AbstractZkLedgerManager {
static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
- public static final String NAME = "hierarchical";
-
- public static final int CUR_VERSION = 1;
static final String IDGEN_ZNODE = "idgen";
static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
@@ -92,8 +88,6 @@ class HierarchicalLedgerManager extends
// Path to generate global id
private final String idGenPath;
- // A sorted map to stored all active ledger ids
- private SnapshotMap<Long, Boolean> activeLedgers;
// we use this to prevent long stack chains from building up in callbacks
ScheduledExecutorService scheduler;
@@ -105,23 +99,11 @@ class HierarchicalLedgerManager extends
* Configuration object
* @param zk
* ZooKeeper Client Handle
- * @param ledgerRootPath
- * ZooKeeper Path to store ledger metadata
- * @throws IOException when version is not compatible
*/
- public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
- String ledgerRootPath, int layoutVersion)
- throws IOException {
- super(conf, zk, ledgerRootPath);
-
- if (layoutVersion != CUR_VERSION) {
- throw new IOException("Incompatible layout version found : "
- + layoutVersion);
- }
+ public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+ super(conf, zk);
this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
- activeLedgers = new SnapshotMap<Long, Boolean>();
-
this.scheduler = Executors.newSingleThreadScheduledExecutor();
if (LOG.isDebugEnabled()) {
LOG.debug("Using HierarchicalLedgerManager with root path : " + ledgerRootPath);
@@ -139,7 +121,7 @@ class HierarchicalLedgerManager extends
}
@Override
- public void newLedgerPath(final GenericCallback<String> ledgerCb, final LedgerMetadata metadata) {
+ public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
ZkUtils.createFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
@Override
@@ -161,20 +143,23 @@ class HierarchicalLedgerManager extends
ledgerCb.operationComplete(KeeperException.Code.SYSTEMERROR.intValue(), null);
return;
}
+ String ledgerPath = getLedgerPath(ledgerId);
+ final long lid = ledgerId;
StringCallback scb = new StringCallback() {
@Override
public void processResult(int rc, String path,
Object ctx, String name) {
if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Could not create node for ledger",
+ KeeperException.create(KeeperException.Code.get(rc), path));
ledgerCb.operationComplete(rc, null);
} else {
- // update znode status
- metadata.updateZnodeStatus(0);
- ledgerCb.operationComplete(rc, name);
+ // update version
+ metadata.setVersion(new ZkVersion(0));
+ ledgerCb.operationComplete(rc, lid);
}
}
};
- String ledgerPath = getLedgerPath(ledgerId);
ZkUtils.createFullPathOptimistic(zk, ledgerPath, metadata.serialize(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null);
// delete the znode for id generation
@@ -351,21 +336,6 @@ class HierarchicalLedgerManager extends
}
@Override
- public void addActiveLedger(long ledgerId, boolean active) {
- activeLedgers.put(ledgerId, active);
- }
-
- @Override
- public void removeActiveLedger(long ledgerId) {
- activeLedgers.remove(ledgerId);
- }
-
- @Override
- public boolean containsActiveLedger(long ledgerId) {
- return activeLedgers.containsKey(ledgerId);
- }
-
- @Override
public void garbageCollectLedgers(GarbageCollector gc) {
// create a snapshot before garbage collection
NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,72 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+ public static final String NAME = "hierarchical";
+ public static final int CUR_VERSION = 1;
+
+ AbstractConfiguration conf;
+ ZooKeeper zk;
+
+ @Override
+ public int getCurrentVersion() {
+ return CUR_VERSION;
+ }
+
+ @Override
+ public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+ final ZooKeeper zk,
+ final int factoryVersion)
+ throws IOException {
+ if (CUR_VERSION != factoryVersion) {
+ throw new IOException("Incompatible layout version found : "
+ + factoryVersion);
+ }
+ this.conf = conf;
+ this.zk = zk;
+ return this;
+ }
+
+ @Override
+ public void uninitialize() throws IOException {
+ // since zookeeper instance is passed from outside
+ // we don't need to close it here
+ }
+
+ @Override
+ public LedgerManager newLedgerManager() {
+ return new HierarchicalLedgerManager(conf, zk);
+ }
+
+ @Override
+ public ActiveLedgerManager newActiveLedgerManager() {
+ return new HierarchicalLedgerManager(conf, zk);
+ }
+
+}