You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/06/19 12:39:38 UTC

svn commit: r1351646 [1/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ book...

Author: ivank
Date: Tue Jun 19 10:39:37 2012
New Revision: 1351646

URL: http://svn.apache.org/viewvc?rev=1351646&view=rev
Log:
BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on metadata operations. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java
      - copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java
      - copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
      - copied, changed from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalAsyncLedgerOpsTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieFailureTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieReadWriteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Jun 19 10:39:37 2012
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
     IMPROVEMENTS:
 
+      BOOKKEEPER-203: improve ledger manager interface to remove zookeeper dependency on metadata operations. (sijie via ivank)
+
   Backward compatible changes:
 
     BUGFIXES:

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jun 19 10:39:37 2012
@@ -36,7 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -66,9 +66,11 @@ public class Bookie extends Thread {
     final ServerConfiguration conf;
 
     final SyncThread syncThread;
-    final LedgerManager ledgerManager;
+    final LedgerManagerFactory activeLedgerManagerFactory;
+    final ActiveLedgerManager activeLedgerManager;
     final LedgerStorage ledgerStorage;
     final Journal journal;
+
     final HandleFactory handles;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
@@ -351,10 +353,11 @@ public class Bookie extends Thread {
         this.zk = instantiateZookeeperClient(conf);
         checkEnvironment(this.zk);
 
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, this.zk);
+        activeLedgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+        activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
 
         syncThread = new SyncThread(conf);
-        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager);
+        ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager);
         handles = new HandleFactoryImpl(ledgerStorage);
         // instantiate the journal
         journal = new Journal(conf);
@@ -603,7 +606,12 @@ public class Bookie extends Thread {
                 syncThread.shutdown();
 
                 // close Ledger Manager
-                ledgerManager.close();
+                try {
+                    activeLedgerManager.close();
+                    activeLedgerManagerFactory.uninitialize();
+                } catch (IOException ie) {
+                    LOG.error("Failed to close active ledger manager : ", ie);
+                }
                 // setting running to false here, so watch thread in bookie server know it only after bookie shut down
                 running = false;
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Tue Jun 19 10:39:37 2012
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
 import org.apache.zookeeper.ZooKeeper;
 
 /**
@@ -73,7 +73,7 @@ public class GarbageCollectorThread exte
     // Ledger Cache Handle
     final LedgerCache ledgerCache;
 
-    final LedgerManager ledgerManager;
+    final ActiveLedgerManager activeLedgerManager;
 
     // flag to ensure gc thread will not be interrupted during compaction
     // to reduce the risk getting entry log corrupted
@@ -117,14 +117,14 @@ public class GarbageCollectorThread exte
     public GarbageCollectorThread(ServerConfiguration conf,
                                   LedgerCache ledgerCache,
                                   EntryLogger entryLogger,
-                                  LedgerManager ledgerManager,
+                                  ActiveLedgerManager activeLedgerManager,
                                   EntryLogScanner scanner)
         throws IOException {
         super("GarbageCollectorThread");
 
         this.ledgerCache = ledgerCache;
         this.entryLogger = entryLogger;
-        this.ledgerManager = ledgerManager;
+        this.activeLedgerManager = activeLedgerManager;
         this.scanner = scanner;
 
         this.gcWaitTime = conf.getGcWaitTime();
@@ -224,8 +224,8 @@ public class GarbageCollectorThread exte
      * Do garbage collection ledger index files
      */
     private void doGcLedgers() {
-        ledgerManager.garbageCollectLedgers(
-        new LedgerManager.GarbageCollector() {
+        activeLedgerManager.garbageCollectLedgers(
+        new ActiveLedgerManager.GarbageCollector() {
             @Override
             public void gc(long ledgerId) {
                 try {
@@ -246,7 +246,7 @@ public class GarbageCollectorThread exte
             EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
             for (Long entryLogLedger : meta.ledgersMap.keySet()) {
                 // Remove the entry log ledger from the set if it isn't active.
-                if (!ledgerManager.containsActiveLedger(entryLogLedger)) {
+                if (!activeLedgerManager.containsActiveLedger(entryLogLedger)) {
                     meta.removeLedger(entryLogLedger);
                 }
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Tue Jun 19 10:39:37 2012
@@ -26,7 +26,7 @@ import java.io.IOException;
 
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 
 import org.slf4j.Logger;
@@ -50,12 +50,12 @@ class InterleavedLedgerStorage implement
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
-    InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager)
+    InterleavedLedgerStorage(ServerConfiguration conf, ActiveLedgerManager activeLedgerManager)
             throws IOException {
         entryLogger = new EntryLogger(conf);
-        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
+        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
-                ledgerManager, new EntryLogCompactionScanner());
+                activeLedgerManager, new EntryLogCompactionScanner());
     }
 
     @Override    

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Tue Jun 19 10:39:37 2012
@@ -35,7 +35,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Map.Entry;
 
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public class LedgerCacheImpl implements 
 
     final File ledgerDirectories[];
 
-    public LedgerCacheImpl(ServerConfiguration conf, LedgerManager alm) {
+    public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm) {
         this.ledgerDirectories = Bookie.getCurrentDirectories(conf.getLedgerDirs());
         this.openFileLimit = conf.getOpenFileLimit();
         this.pageSize = conf.getPageSize();
@@ -83,7 +83,7 @@ public class LedgerCacheImpl implements 
 
     // Manage all active ledgers in LedgerManager
     // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
-    final LedgerManager activeLedgerManager;
+    final ActiveLedgerManager activeLedgerManager;
 
     final int openFileLimit;
     final int pageSize;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java Tue Jun 19 10:39:37 2012
@@ -76,6 +76,8 @@ public abstract class BKException extend
             return new BKInterruptedException();
         case Code.ProtocolVersionException:
             return new BKProtocolVersionException();
+        case Code.MetadataVersionException:
+            return new BKMetadataVersionException();
         case Code.LedgerFencedException:
             return new BKLedgerFencedException();
         case Code.UnauthorizedAccessException:
@@ -107,6 +109,7 @@ public abstract class BKException extend
         int IncorrectParameterException = -14;
         int InterruptedException = -15;
         int ProtocolVersionException = -16;
+        int MetadataVersionException = -17;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -157,6 +160,8 @@ public abstract class BKException extend
             return "Interrupted while waiting for permit";
         case Code.ProtocolVersionException:
             return "Bookie protocol version on server is incompatible with client";
+        case Code.MetadataVersionException:
+            return "Bad ledger metadata version";
         case Code.LedgerFencedException:
             return "Ledger has been fenced off. Some other client must have opened it to read";
         case Code.UnauthorizedAccessException:
@@ -226,6 +231,12 @@ public abstract class BKException extend
         }
     }
 
+    public static class BKMetadataVersionException extends BKException {
+        public BKMetadataVersionException() {
+            super(Code.MetadataVersionException);
+        }
+    }
+
     public static class BKNoSuchLedgerExistsException extends BKException {
         public BKNoSuchLedgerExistsException() {
             super(Code.NoSuchLedgerExistsException);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jun 19 10:39:37 2012
@@ -81,6 +81,7 @@ public class BookKeeper {
     final OrderedSafeExecutor mainWorkerPool;
 
     // Ledger manager responsible for how to store ledger meta data
+    final LedgerManagerFactory ledgerManagerFactory;
     final LedgerManager ledgerManager;
 
     final ClientConfiguration conf;
@@ -148,7 +149,8 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(conf, this);
         bookieWatcher.readBookiesBlocking();
 
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+        ledgerManager = ledgerManagerFactory.newLedgerManager();
 
         ownChannelFactory = true;
         ownZKHandle = true;
@@ -210,7 +212,8 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(conf, this);
         bookieWatcher.readBookiesBlocking();
 
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
+        ledgerManager = ledgerManagerFactory.newLedgerManager();
     }
 
     LedgerManager getLedgerManager() {
@@ -494,11 +497,8 @@ public class BookKeeper {
         asyncDeleteLedger(lId, new SyncDeleteCallback(), counter);
         // Wait
         counter.block(0);
-        if (counter.getrc() == KeeperException.Code.NONODE.intValue()) {
-            LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", lId);
-            throw BKException.create(Code.NoSuchLedgerExistsException);
-        } else if (counter.getrc() != KeeperException.Code.OK.intValue()) {
-            LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
+        if (counter.getrc() != BKException.Code.OK) {
+            LOG.error("Error deleting ledger " + lId + " : " + counter.getrc());
             throw BKException.create(Code.ZKException);
         }
     }
@@ -509,7 +509,12 @@ public class BookKeeper {
      */
     public void close() throws InterruptedException, BKException {
         bookieClient.close();
-        ledgerManager.close();
+        try {
+            ledgerManager.close();
+            ledgerManagerFactory.uninitialize();
+        } catch (IOException ie) {
+            LOG.error("Failed to close ledger manager : ", ie);
+        }
         bookieWatcher.halt();
         if (ownChannelFactory) {
             channelFactory.releaseExternalResources();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Tue Jun 19 10:39:37 2012
@@ -752,14 +752,13 @@ public class BookKeeperAdmin {
             int deadBookieIndex = ensemble.indexOf(oldBookie);
             ensemble.remove(deadBookieIndex);
             ensemble.add(deadBookieIndex, newBookie);
-
-            lh.writeLedgerConfig(new WriteCb(), null);
+            lh.writeLedgerConfig(new WriteCb());
         }
         
-        private class WriteCb implements AsyncCallback.StatCallback {
+        private class WriteCb implements GenericCallback<Void> {
             @Override
-            public void processResult(int rc, final String path, Object ctx, Stat stat) {
-                if (rc == Code.BADVERSION.intValue()) {
+            public void operationComplete(int rc, Void result) {
+                if (rc == BKException.Code.MetadataVersionException) {
                     LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId() 
                              + " startid: " + fragmentStartId);
                     // try again, the previous success (with which this has conflicted)
@@ -769,8 +768,7 @@ public class BookKeeperAdmin {
                         @Override
                         public void operationComplete(int rc, LedgerMetadata newMeta) {
                             if (rc != BKException.Code.OK) {
-                                LOG.error("Error reading updated ledger metadata for ledger " + lh.getId(),
-                                          KeeperException.create(KeeperException.Code.get(rc), path));
+                                LOG.error("Error reading updated ledger metadata for ledger " + lh.getId());
                                 ledgerFragmentsMcb.processResult(rc, null, null);
                             } else {
                                 lh.metadata = newMeta;
@@ -779,11 +777,10 @@ public class BookKeeperAdmin {
                         }
                     });
                     return;
-                } else if (rc != Code.OK.intValue()) {
-                    LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
-                              KeeperException.create(KeeperException.Code.get(rc), path));
+                } else if (rc != BKException.Code.OK) {
+                    LOG.error("Error updating ledger config metadata for ledgerId " + lh.getId() + " : "
+                            + BKException.getMessage(rc));
                 } else {
-                    lh.getLedgerMetadata().updateZnodeStatus(stat);
                     LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : " + fragmentStartId 
                              + ") to point ledger fragments from old dead bookie: (" + oldBookie
                              + ") to new bookie: (" + newBookie + ")");

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Tue Jun 19 10:39:37 2012
@@ -25,20 +25,20 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * Encapsulates asynchronous ledger create operation
  *
  */
-class LedgerCreateOp implements GenericCallback<String> {
+class LedgerCreateOp implements GenericCallback<Long> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
 
@@ -102,32 +102,17 @@ class LedgerCreateOp implements GenericC
          */
         metadata.addEnsemble(0L, ensemble);
 
-        // create a ledger path with metadata
-        bk.getLedgerManager().newLedgerPath(this, metadata);
+        // create a ledger with metadata
+        bk.getLedgerManager().createLedger(metadata, this);
     }
 
     /**
-     * Callback when created ledger path.
+     * Callback when created ledger.
      */
     @Override
-    public void operationComplete(int rc, String ledgerPath) {
-
-        if (rc != KeeperException.Code.OK.intValue()) {
-            LOG.error("Could not create node for ledger",
-                      KeeperException.create(KeeperException.Code.get(rc), ledgerPath));
-            cb.createComplete(BKException.Code.ZKException, null, this.ctx);
-            return;
-        }
-
-        /*
-         * Extract ledger id.
-         */
-        long ledgerId;
-        try {
-            ledgerId = bk.getLedgerManager().getLedgerId(ledgerPath);
-        } catch (IOException e) {
-            LOG.error("Could not extract ledger-id from path:" + ledgerPath, e);
-            cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+    public void operationComplete(int rc, Long ledgerId) {
+        if (BKException.Code.OK != rc) {
+            cb.createComplete(rc, null, this.ctx);
             return;
         }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Tue Jun 19 10:39:37 2012
@@ -22,15 +22,15 @@
 package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 
 /**
  * Encapsulates asynchronous ledger delete operation
  *
  */
-class LedgerDeleteOp implements VoidCallback {
+class LedgerDeleteOp implements GenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
 
@@ -62,19 +62,15 @@ class LedgerDeleteOp implements VoidCall
      * Initiates the operation
      */
     public void initiate() {
-        // Asynchronously delete the ledger node in ZK.
+        // Asynchronously delete the ledger from meta manager
         // When this completes, it will invoke the callback method below.
-
-        bk.getZkHandle().delete(bk.getLedgerManager().getLedgerPath(ledgerId), -1, this, null);
+        bk.getLedgerManager().deleteLedger(ledgerId, this);
     }
 
     /**
-     * Implements ZooKeeper Void Callback.
-     *
-     * @see org.apache.zookeeper.AsyncCallback.VoidCallback#processResult(int,
-     *      java.lang.String, java.lang.Object)
+     * Implements Delete Callback.
      */
-    public void processResult(int rc, String path, Object ctx) {
+    public void operationComplete(int rc, Void result) {
         cb.deleteComplete(rc, this.ctx);
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Tue Jun 19 10:39:37 2012
@@ -46,10 +46,6 @@ import org.apache.bookkeeper.util.SafeRu
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
 import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
@@ -205,14 +201,12 @@ public class LedgerHandle {
         return distributionSchedule;
     }
 
-    void writeLedgerConfig(StatCallback callback, Object ctx) {
+    void writeLedgerConfig(GenericCallback<Void> writeCb) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing metadata to ZooKeeper: " + this.ledgerId + ", " + metadata.getZnodeVersion());
+            LOG.debug("Writing metadata to ledger manager: " + this.ledgerId + ", " + metadata.getVersion());
         }
 
-        bk.getZkHandle().setData(bk.getLedgerManager().getLedgerPath(ledgerId),
-                                 metadata.serialize(), metadata.getZnodeVersion(),
-                                 callback, ctx);
+        bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb);
     }
 
     /**
@@ -286,48 +280,44 @@ public class LedgerHandle {
                               + metadata.close + " with this many bytes: " + metadata.length);
                 }
 
-                final class CloseCb implements StatCallback {
+                final class CloseCb implements GenericCallback<Void> {
                     @Override
-                    public void processResult(final int rc, String path, Object subctx,
-                                              final Stat stat) {
-                        if (rc == KeeperException.Code.BadVersion) {
+                    public void operationComplete(final int rc, Void result) {
+                        if (rc == BKException.Code.MetadataVersionException) {
                             rereadMetadata(new GenericCallback<LedgerMetadata>() {
                                 @Override
                                 public void operationComplete(int newrc, LedgerMetadata newMeta) {
                                     if (newrc != BKException.Code.OK) {
                                         LOG.error("Error reading new metadata from ledger " + ledgerId
                                                   + " when closing, code=" + newrc);
-                                        cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+                                        cb.closeComplete(rc, LedgerHandle.this, ctx);
                                     } else {
                                         metadata.close(prevClose);
                                         metadata.length = prevLength;
                                         if (metadata.resolveConflict(newMeta)) {
                                             metadata.length = length;
                                             metadata.close(lastAddConfirmed);
-                                            writeLedgerConfig(new CloseCb(), null);
+                                            writeLedgerConfig(new CloseCb());
                                             return;
                                         } else {
                                             metadata.length = length;
                                             metadata.close(lastAddConfirmed);
-                                            LOG.warn("Conditional write failed: "
-                                                     + KeeperException.Code.get(KeeperException.Code.BadVersion));
-                                            cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+                                            LOG.warn("Conditional update ledger metadata for ledger " + ledgerId + " failed.");
+                                            cb.closeComplete(rc, LedgerHandle.this, ctx);
                                         }
                                     }
                                 }
                             });
-                        } else if (rc != KeeperException.Code.OK.intValue()) {
-                            LOG.warn("Conditional write failed: " + KeeperException.Code.get(rc));
-                            cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
-                                             ctx);
+                        } else if (rc != BKException.Code.OK) {
+                            LOG.error("Error update ledger metadata for ledger " + ledgerId + " : " + rc);
+                            cb.closeComplete(rc, LedgerHandle.this, ctx);
                         } else {
-                            metadata.updateZnodeStatus(stat);
                             cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
                         }
                     }
                 };
 
-                writeLedgerConfig(new CloseCb(), null);
+                writeLedgerConfig(new CloseCb());
 
             }
         });
@@ -687,20 +677,20 @@ public class LedgerHandle {
         final long newEnsembleStartEntry = lastAddConfirmed + 1;
         metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
 
-        final class ChangeEnsembleCb implements StatCallback {
+        final class ChangeEnsembleCb implements GenericCallback<Void> {
             @Override
-            public void processResult(final int rc, String path, Object ctx, final Stat stat) {
+            public void operationComplete(final int rc, Void result) {
 
                 bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        if (rc == KeeperException.Code.BadVersion) {
+                        if (rc == BKException.Code.MetadataVersionException) {
                             rereadMetadata(new GenericCallback<LedgerMetadata>() {
                                 @Override
                                 public void operationComplete(int newrc, LedgerMetadata newMeta) {
                                     if (newrc != BKException.Code.OK) {
                                         LOG.error("Error reading new metadata from ledger after changing ensemble, code=" + newrc);
-                                        handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                                        handleUnrecoverableErrorDuringAdd(rc);
                                     } else {
                                         // a new ensemble is added only when the start entry is larger than zero
                                         if (newEnsembleStartEntry > 0) {
@@ -708,27 +698,25 @@ public class LedgerHandle {
                                         }
                                         if (metadata.resolveConflict(newMeta)) {
                                             metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
-                                            writeLedgerConfig(new ChangeEnsembleCb(), null);
+                                            writeLedgerConfig(new ChangeEnsembleCb());
                                             return;
                                         } else {
                                             LOG.error("Could not resolve ledger metadata conflict while changing ensemble to: "
                                                       + newEnsemble + ", old meta data is \n" + new String(metadata.serialize())
                                                       + "\n, new meta data is \n" + new String(newMeta.serialize()) + "\n ,closing ledger");
-                                            handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                                            handleUnrecoverableErrorDuringAdd(rc);
                                         }
                                     }
                                 }
                             });
                             return;
-                        } else if (rc != KeeperException.Code.OK.intValue()) {
-                            LOG
-                            .error("Could not persist ledger metadata while changing ensemble to: "
-                                   + newEnsemble + " , closing ledger");
-                            handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                        } else if (rc != BKException.Code.OK) {
+                            LOG.error("Could not persist ledger metadata while changing ensemble to: "
+                                    + newEnsemble + " , closing ledger");
+                            handleUnrecoverableErrorDuringAdd(rc);
                             return;
                         }
 
-                        metadata.updateZnodeStatus(stat);
                         for (PendingAddOp pendingAddOp : pendingAddOps) {
                             pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
                         }
@@ -738,31 +726,11 @@ public class LedgerHandle {
             }
         };
 
-        writeLedgerConfig(new ChangeEnsembleCb(), null);
-
+        writeLedgerConfig(new ChangeEnsembleCb());
     }
 
     void rereadMetadata(final GenericCallback<LedgerMetadata> cb) {
-        bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
-            new DataCallback() {
-                public void processResult(int rc, String path,
-                                          Object ctx, byte[] data, Stat stat) {
-                    if (rc != KeeperException.Code.OK.intValue()) {
-                        LOG.error("Error reading metadata from ledger, code =" + rc);
-                        cb.operationComplete(BKException.Code.ZKException, null);
-                        return;
-                    }
-
-                    try {
-                        LedgerMetadata newMeta = LedgerMetadata.parseConfig(data, stat.getVersion());
-                        cb.operationComplete(BKException.Code.OK, newMeta);
-                    } catch (IOException e) {
-                        LOG.error("Error parsing ledger metadata for ledger", e);
-                        cb.operationComplete(BKException.Code.ZKException, null);
-                        return;
-                    }
-                }
-        }, null);
+        bk.getLedgerManager().readLedgerMetadata(ledgerId, cb);
     }
 
     synchronized void recover(final GenericCallback<Void> cb) {
@@ -784,31 +752,29 @@ public class LedgerHandle {
 
         metadata.markLedgerInRecovery();
 
-        writeLedgerConfig(new StatCallback() {
+        writeLedgerConfig(new GenericCallback<Void>() {
             @Override
-            public void processResult(final int rc, String path, Object ctx, Stat stat) {
-                if (rc == KeeperException.Code.BadVersion) {
+            public void operationComplete(final int rc, Void result) {
+                if (rc == BKException.Code.MetadataVersionException) {
                     rereadMetadata(new GenericCallback<LedgerMetadata>() {
-                            @Override
-                            public void operationComplete(int rc, LedgerMetadata newMeta) {
-                                if (rc != BKException.Code.OK) {
-                                    cb.operationComplete(rc, null);
-                                } else {
-                                    metadata = newMeta;
-                                    recover(cb);
-                                }
+                        @Override
+                        public void operationComplete(int rc, LedgerMetadata newMeta) {
+                            if (rc != BKException.Code.OK) {
+                                cb.operationComplete(rc, null);
+                            } else {
+                                metadata = newMeta;
+                                recover(cb);
                             }
-                        });
-                } else if (rc == KeeperException.Code.OK.intValue()) {
-                    metadata.znodeVersion = stat.getVersion();
+                        }
+                    });
+                } else if (rc == BKException.Code.OK) {
                     new LedgerRecoveryOp(LedgerHandle.this, cb).initiate();
                 } else {
-                    LOG.error("Error writing ledger config " +  rc 
-                              + " path = " + path);
-                    cb.operationComplete(BKException.Code.ZKException, null);
+                    LOG.error("Error writing ledger config " + rc + " of ledger " + ledgerId);
+                    cb.operationComplete(rc, null);
                 }
             }
-        }, null);
+        });
     }
 
     static class NoopCloseCallback implements CloseCallback {
@@ -816,7 +782,7 @@ public class LedgerHandle {
 
         @Override
         public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-            if (rc != KeeperException.Code.OK.intValue()) {
+            if (rc != BKException.Code.OK) {
                 LOG.warn("Close failed: " + BKException.getMessage(rc));
             }
             // noop

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Tue Jun 19 10:39:37 2012
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -60,8 +61,8 @@ public class LedgerMetadata {
     long close;
     private SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = new TreeMap<Long, ArrayList<InetSocketAddress>>();
     ArrayList<InetSocketAddress> currentEnsemble;
-    volatile int znodeVersion = -1;
-    
+    volatile Version version = null;
+
     public LedgerMetadata(int ensembleSize, int quorumSize) {
         this.ensembleSize = ensembleSize;
         this.quorumSize = quorumSize;
@@ -171,12 +172,13 @@ public class LedgerMetadata {
      *
      * @param array
      *            byte array to parse
+     * @param version
+     *            version of the ledger metadata
      * @return LedgerConfig
      * @throws IOException
      *             if the given byte[] cannot be parsed
      */
-
-    static LedgerMetadata parseConfig(byte[] bytes, int version) throws IOException {
+    public static LedgerMetadata parseConfig(byte[] bytes, Version version) throws IOException {
 
         LedgerMetadata lc = new LedgerMetadata();
         String config = new String(bytes);
@@ -207,7 +209,7 @@ public class LedgerMetadata {
                 throw new IOException("Quorum size or ensemble size absent from config: " + config);
             }
 
-            lc.znodeVersion = version;
+            lc.version = version;
             lc.quorumSize = new Integer(lines[i++]);
             lc.ensembleSize = new Integer(lines[i++]);
             lc.length = new Long(lines[i++]);
@@ -234,31 +236,21 @@ public class LedgerMetadata {
     
 
     /**
-     * Updates the status of this metadata in ZooKeeper.
+     * Updates the version of this metadata.
      * 
-     * @param stat
-     */
-    public void updateZnodeStatus(Stat stat) {
-        this.znodeVersion = stat.getVersion();
-    }
-
-    /**
-     * Update the znode version of this metadata
-     *
-     * @param znodeVersion
-     *        Znode version of this metadata
+     * @param v Version
      */
-    public void updateZnodeStatus(int znodeVersion) {
-        this.znodeVersion = znodeVersion;
+    public void setVersion(Version v) {
+        this.version = v;
     }
 
     /**
-     * Returns the last znode version.
+     * Returns the last version.
      * 
-     * @return int znode version
+     * @return version
      */
-    public int getZnodeVersion() {
-        return this.znodeVersion;
+    public Version getVersion() {
+        return this.version;
     }
 
     /**
@@ -282,7 +274,8 @@ public class LedgerMetadata {
             return false;
         }
         // new meta znode version should be larger than old one
-        if (znodeVersion > newMeta.znodeVersion) {
+        if (null != version &&
+            Version.Occurred.AFTER == version.compare(newMeta.version)) {
             return false;
         }
         // ensemble size should be same
@@ -306,7 +299,7 @@ public class LedgerMetadata {
          *  ensemble and znode version
          */
         ensembles = newMeta.ensembles;
-        znodeVersion = newMeta.znodeVersion;
+        version = newMeta.version;
         return true;
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Tue Jun 19 10:39:37 2012
@@ -23,21 +23,20 @@ package org.apache.bookkeeper.client;
 
 import java.io.IOException;
 import java.security.GeneralSecurityException;
+
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * Encapsulates the ledger open operation
  *
  */
-class LedgerOpenOp implements DataCallback {
+class LedgerOpenOp implements GenericCallback<LedgerMetadata> {
     static final Logger LOG = LoggerFactory.getLogger(LedgerOpenOp.class);
 
     final BookKeeper bk;
@@ -78,8 +77,7 @@ class LedgerOpenOp implements DataCallba
         /**
          * Asynchronously read the ledger metadata node.
          */
-
-        bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false, this, ctx);
+        bk.getLedgerManager().readLedgerMetadata(ledgerId, this);
     }
 
     /**
@@ -91,34 +89,15 @@ class LedgerOpenOp implements DataCallba
     }
 
     /**
-     * Implements ZooKeeper data callback.
-     * @see org.apache.zookeeper.AsyncCallback.DataCallback#processResult(int, String, Object, byte[], Stat)
+     * Implements Open Ledger Callback.
      */
-    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-
-        if (rc == KeeperException.Code.NONODE.intValue()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("No such ledger: " + ledgerId, KeeperException.create(KeeperException.Code.get(rc), path));
-            }
-            cb.openComplete(BKException.Code.NoSuchLedgerExistsException, null, this.ctx);
+    public void operationComplete(int rc, LedgerMetadata metadata) {
+        if (BKException.Code.OK != rc) {
+            // open ledger failed.
+            cb.openComplete(rc, null, this.ctx);
             return;
         }
-        if (rc != KeeperException.Code.OK.intValue()) {
-            LOG.error("Could not read metadata for ledger: " + ledgerId, KeeperException.create(KeeperException.Code
-                      .get(rc), path));
-            cb.openComplete(BKException.Code.ZKException, null, this.ctx);
-            return;
-        }
-
-        LedgerMetadata metadata;
-        try {
-            metadata = LedgerMetadata.parseConfig(data, stat.getVersion());
-        } catch (IOException e) {
-            LOG.error("Could not parse ledger metadata for ledger: " + ledgerId, e);
-            cb.openComplete(BKException.Code.ZKException, null, this.ctx);
-            return;
-        }
-
+        // get the ledger metadata back
         try {
             lh = new ReadOnlyLedgerHandle(bk, ledgerId, metadata, digestType, passwd);
         } catch (GeneralSecurityException e) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Tue Jun 19 10:39:37 2012
@@ -26,14 +26,29 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.SystemConfiguration;
 
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.util.ReflectionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract configuration
  */
 public abstract class AbstractConfiguration extends CompositeConfiguration {
 
+    static final Logger LOG = LoggerFactory.getLogger(AbstractConfiguration.class);
+
+    private static ClassLoader defaultLoader;
+    static {
+        defaultLoader = Thread.currentThread().getContextClassLoader();
+        if (null == defaultLoader) {
+            defaultLoader = AbstractConfiguration.class.getClassLoader();
+        }
+    }
+
     // Ledger Manager
     protected final static String LEDGER_MANAGER_TYPE = "ledgerManagerType";
+    protected final static String LEDGER_MANAGER_FACTORY_CLASS = "ledgerManagerFactoryClass";
     protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
     protected final static String AVAILABLE_NODE = "available";
     protected AbstractConfiguration() {
@@ -79,7 +94,10 @@ public abstract class AbstractConfigurat
      *
      * @param lmType
      *          Ledger Manager Type
+     * @return void
+     * @deprecated replaced by {@link #setLedgerManagerFactoryClass()}
      */
+    @Deprecated
     public void setLedgerManagerType(String lmType) {
         setProperty(LEDGER_MANAGER_TYPE, lmType); 
     }
@@ -89,12 +107,48 @@ public abstract class AbstractConfigurat
      *
      * @return ledger manager type
      * @throws ConfigurationException
+     * @deprecated replaced by {@link #getLedgerManagerFactoryClass()}
      */
+    @Deprecated
     public String getLedgerManagerType() {
         return getString(LEDGER_MANAGER_TYPE);
     }
 
     /**
+     * Set Ledger Manager Factory Class Name.
+     *
+     * @param factoryClassName
+     *          Ledger Manager Factory Class Name
+     * @return void
+     */
+    public void setLedgerManagerFactoryClassName(String factoryClassName) {
+        setProperty(LEDGER_MANAGER_FACTORY_CLASS, factoryClassName);
+    }
+
+    /**
+     * Set Ledger Manager Factory Class.
+     *
+     * @param factoryClass
+     *          Ledger Manager Factory Class
+     * @return void
+     */
+    public void setLedgerManagerFactoryClass(Class<? extends LedgerManagerFactory> factoryClass) {
+        setProperty(LEDGER_MANAGER_FACTORY_CLASS, factoryClass.getName());
+    }
+
+    /**
+     * Get ledger manager factory class.
+     *
+     * @return ledger manager factory class
+     */
+    public Class<? extends LedgerManagerFactory> getLedgerManagerFactoryClass()
+        throws ConfigurationException {
+        return ReflectionUtils.getClass(this, LEDGER_MANAGER_FACTORY_CLASS,
+                                        null, LedgerManagerFactory.class,
+                                        defaultLoader);
+    }
+
+    /**
      * Set Zk Ledgers Root Path.
      *
      * @param zkLedgersPath zk ledgers root path

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -25,22 +25,29 @@ import java.util.Set;
 import java.util.Map;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
  */
-abstract class AbstractZkLedgerManager implements LedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager, ActiveLedgerManager {
 
     static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
@@ -53,6 +60,9 @@ abstract class AbstractZkLedgerManager i
     protected final ZooKeeper zk;
     protected final String ledgerRootPath;
 
+    // A sorted map to stored all active ledger ids
+    protected final SnapshotMap<Long, Boolean> activeLedgers;
+
     /**
      * ZooKeeper-based Ledger Manager Constructor
      *
@@ -60,14 +70,112 @@ abstract class AbstractZkLedgerManager i
      *          Configuration object
      * @param zk
      *          ZooKeeper Client Handle
-     * @param ledgerRootPath
-     *          ZooKeeper Path to store ledger metadata
      */
-    protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
-                                      String ledgerRootPath) {
+    protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         this.conf = conf;
         this.zk = zk;
-        this.ledgerRootPath = ledgerRootPath;
+        this.ledgerRootPath = conf.getZkLedgersRootPath();
+
+        activeLedgers = new SnapshotMap<Long, Boolean>();
+    }
+
+    /**
+     * Get the znode path that is used to store ledger metadata
+     *
+     * @param ledgerId
+     *          Ledger ID
+     * @return ledger node path
+     */
+    protected abstract String getLedgerPath(long ledgerId);
+
+    /**
+     * Get ledger id from its znode ledger path
+     *
+     * @param ledgerPath
+     *          Ledger path to store metadata
+     * @return ledger id
+     * @throws IOException when the ledger path is invalid
+     */
+    protected abstract long getLedgerId(String ledgerPath) throws IOException;
+
+    @Override
+    public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+        zk.delete(getLedgerPath(ledgerId), -1, new VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                int bkRc;
+                if (rc == KeeperException.Code.NONODE.intValue()) {
+                    LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
+                    bkRc = BKException.Code.NoSuchLedgerExistsException;
+                } else if (rc == KeeperException.Code.OK.intValue()) {
+                    bkRc = BKException.Code.OK;
+                } else {
+                    bkRc = BKException.Code.ZKException;
+                }
+                cb.operationComplete(bkRc, (Void)null);
+            }
+        }, null);
+    }
+
+    @Override
+    public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata> readCb) {
+        zk.getData(getLedgerPath(ledgerId), false, new DataCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                if (rc == KeeperException.Code.NONODE.intValue()) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("No such ledger: " + ledgerId,
+                                  KeeperException.create(KeeperException.Code.get(rc), path));
+                    }
+                    readCb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+                    return;
+                }
+                if (rc != KeeperException.Code.OK.intValue()) {
+                    LOG.error("Could not read metadata for ledger: " + ledgerId,
+                              KeeperException.create(KeeperException.Code.get(rc), path));
+                    readCb.operationComplete(BKException.Code.ZKException, null);
+                    return;
+                }
+
+                LedgerMetadata metadata;
+                try {
+                    metadata = LedgerMetadata.parseConfig(data, new ZkVersion(stat.getVersion()));
+                } catch (IOException e) {
+                    LOG.error("Could not parse ledger metadata for ledger: " + ledgerId, e);
+                    readCb.operationComplete(BKException.Code.ZKException, null);
+                    return;
+                }
+                readCb.operationComplete(BKException.Code.OK, metadata);
+            }
+        }, null);
+    }
+
+    @Override
+    public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+                                    final GenericCallback<Void> cb) {
+        Version v = metadata.getVersion();
+        if (null == v || !(v instanceof ZkVersion)) {
+            cb.operationComplete(BKException.Code.MetadataVersionException, null);
+            return;
+        }
+        final ZkVersion zv = (ZkVersion) v;
+        zk.setData(getLedgerPath(ledgerId),
+                   metadata.serialize(), zv.getZnodeVersion(),
+                   new StatCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, Stat stat) {
+                if (KeeperException.Code.BadVersion == rc) {
+                    cb.operationComplete(BKException.Code.MetadataVersionException, null);
+                } else if (KeeperException.Code.OK.intValue() == rc) {
+                    // update metadata version
+                    metadata.setVersion(zv.setZnodeVersion(stat.getVersion()));
+                    cb.operationComplete(BKException.Code.OK, null);
+                } else {
+                    LOG.warn("Conditional update ledger metadata failed: ", KeeperException.Code.get(rc));
+                    cb.operationComplete(BKException.Code.ZKException, null);
+                }
+            }
+        }, null);
     }
 
     /**
@@ -260,6 +368,21 @@ abstract class AbstractZkLedgerManager i
     public void close() {
     }
 
+    @Override
+    public void addActiveLedger(long ledgerId, boolean active) {
+        activeLedgers.put(ledgerId, active);
+    }
+
+    @Override
+    public void removeActiveLedger(long ledgerId) {
+        activeLedgers.remove(ledgerId);
+    }
+
+    @Override
+    public boolean containsActiveLedger(long ledgerId) {
+        return activeLedgers.containsKey(ledgerId);
+    }
+
     /**
      * Do garbage collecting comparing hosted ledgers and zk ledgers
      *

Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java (from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java&p1=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java&r1=1351374&r2=1351646&rev=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -18,76 +18,17 @@ package org.apache.bookkeeper.meta;
  * limitations under the License.
  */
 
-import java.io.IOException;
-
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import java.io.Closeable;
 
 /**
- * LedgerManager takes responsibility of ledger management
+ * ActiveLedgerManager takes responsibility of active ledger management in bookie side.
  *
  * <ul>
- * <li>How to store ledger meta (e.g. in ZooKeeper or other key/value store)
  * <li>How to manager active ledgers (so know how to do garbage collection)
  * <li>How to garbage collect inactive/deleted ledgers
  * </ul>
  */
-public interface LedgerManager {
-
-    /**
-     * Get the path that is used to store ledger metadata
-     *
-     * @param ledgerId
-     *          Ledger ID
-     * @return ledger node path
-     */
-    public String getLedgerPath(long ledgerId);
-
-    /**
-     * Get ledger id from its ledger path
-     *
-     * @param ledgerPath
-     *          Ledger path to store metadata
-     * @return ledger id
-     * @throws IOException when the ledger path is invalid
-     */
-    public long getLedgerId(String ledgerPath) throws IOException;
-
-    /**
-     * Create a new zk ledger path with provided metadata
-     *
-     * @param cb
-     *        Callback when getting new zk ledger path to create.
-     * @param metadata
-     *        Metadata provided when creating a new ledger
-     */
-    public abstract void newLedgerPath(GenericCallback<String> cb, LedgerMetadata metadata);
-
-    /**
-     * Loop to process all ledgers.
-     * <p>
-     * <ul>
-     * After all ledgers were processed, finalCb will be triggerred:
-     * <li> if all ledgers are processed done with OK, success rc will be passed to finalCb.
-     * <li> if some ledgers are prcoessed failed, failure rc will be passed to finalCb.
-     * </ul>
-     * </p>
-     *
-     * @param processor
-     *          Ledger Processor to process a specific ledger
-     * @param finalCb
-     *          Callback triggered after all ledgers are processed
-     * @param context
-     *          Context of final callback
-     * @param successRc
-     *          Success RC code passed to finalCb when callback
-     * @param failureRc
-     *          Failure RC code passed to finalCb when exceptions occured.
-     */
-    public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
-                                    Object context, int successRc, int failureRc);
+public interface ActiveLedgerManager extends Closeable {
 
     /**
      * Add active ledger
@@ -140,8 +81,4 @@ public interface LedgerManager {
      */
     public void garbageCollectLedgers(GarbageCollector gc);
 
-    /**
-     * Close ledger manager
-     */
-    public void close();
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -19,10 +19,10 @@ package org.apache.bookkeeper.meta;
  */
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.util.ZkUtil
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
@@ -61,13 +62,8 @@ import org.slf4j.LoggerFactory;
 class FlatLedgerManager extends AbstractZkLedgerManager {
 
     static final Logger LOG = LoggerFactory.getLogger(FlatLedgerManager.class);
-    public static final String NAME = "flat";
-    public static final int CUR_VERSION = 1;
-
     // path prefix to store ledger znodes
     private final String ledgerPrefix;
-    // hash map to store all active ledger ids
-    private SnapshotMap<Long, Boolean> activeLedgers;
 
     /**
      * Constructor
@@ -80,32 +76,32 @@ class FlatLedgerManager extends Abstract
      *          ZooKeeper Path to store ledger metadata
      * @throws IOException when version is not compatible
      */
-    public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
-                             String ledgerRootPath, int layoutVersion)
-        throws IOException {
-        super(conf, zk, ledgerRootPath);
-
-        if (layoutVersion != CUR_VERSION) {
-            throw new IOException("Incompatible layout version found : " 
-                                  + layoutVersion);
-        }
+    public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
 
         ledgerPrefix = ledgerRootPath + "/" + LEDGER_NODE_PREFIX;
-        activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
     @Override
-    public void newLedgerPath(final GenericCallback<String> cb, final LedgerMetadata metadata) {
+    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) {
         StringCallback scb = new StringCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx,
                     String name) {
                 if (Code.OK.intValue() != rc) {
+                    LOG.error("Could not create node for ledger",
+                              KeeperException.create(KeeperException.Code.get(rc), path));
                     cb.operationComplete(rc, null);
                 } else {
                     // update znode status
-                    metadata.updateZnodeStatus(0);
-                    cb.operationComplete(rc, name);
+                    metadata.setVersion(new ZkVersion(0));
+                    try {
+                        long ledgerId = getLedgerId(name);
+                        cb.operationComplete(rc, ledgerId);
+                    } catch (IOException ie) {
+                        LOG.error("Could not extract ledger-id from path:" + name, ie);
+                        cb.operationComplete(BKException.Code.ZKException, null);
+                    }
                 }
             }
         };
@@ -141,21 +137,6 @@ class FlatLedgerManager extends Abstract
     }
 
     @Override
-    public void addActiveLedger(long ledgerId, boolean active) {
-        activeLedgers.put(ledgerId, active);
-    }
-
-    @Override
-    public void removeActiveLedger(long ledgerId) {
-        activeLedgers.remove(ledgerId);
-    }
-
-    @Override
-    public boolean containsActiveLedger(long ledgerId) {
-        return activeLedgers.containsKey(ledgerId);
-    }
-
-    @Override
     public void garbageCollectLedgers(GarbageCollector gc) {
         try {
             // create a snapshot first

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,72 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Flat Ledger Manager Factory
+ */
+class FlatLedgerManagerFactory extends LedgerManagerFactory {
+
+    public static final String NAME = "flat";
+    public static final int CUR_VERSION = 1;
+
+    AbstractConfiguration conf;
+    ZooKeeper zk;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+                                           final ZooKeeper zk,
+                                           final int factoryVersion)
+    throws IOException {
+        if (CUR_VERSION != factoryVersion) {
+            throw new IOException("Incompatible layout version found : "
+                                + factoryVersion);
+        }
+        this.conf = conf;
+        this.zk = zk;
+        return this;
+    }
+
+    @Override
+    public void uninitialize() throws IOException {
+        // since zookeeper instance is passed from outside
+        // we don't need to close it here
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new FlatLedgerManager(conf, zk);
+    }
+
+    @Override
+    public ActiveLedgerManager newActiveLedgerManager() {
+        return new FlatLedgerManager(conf, zk);
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.Set;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.HashSet;
 import java.util.List;
 
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -81,9 +80,6 @@ import org.slf4j.LoggerFactory;
 class HierarchicalLedgerManager extends AbstractZkLedgerManager {
 
     static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
-    public static final String NAME = "hierarchical";
-
-    public static final int CUR_VERSION = 1;
 
     static final String IDGEN_ZNODE = "idgen";
     static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
@@ -92,8 +88,6 @@ class HierarchicalLedgerManager extends 
 
     // Path to generate global id
     private final String idGenPath;
-    // A sorted map to stored all active ledger ids
-    private SnapshotMap<Long, Boolean> activeLedgers;
 
     // we use this to prevent long stack chains from building up in callbacks
     ScheduledExecutorService scheduler;
@@ -105,23 +99,11 @@ class HierarchicalLedgerManager extends 
      *          Configuration object
      * @param zk
      *          ZooKeeper Client Handle
-     * @param ledgerRootPath
-     *          ZooKeeper Path to store ledger metadata
-     * @throws IOException when version is not compatible
      */
-    public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
-                                     String ledgerRootPath, int layoutVersion)
-        throws IOException {
-        super(conf, zk, ledgerRootPath);
-
-        if (layoutVersion != CUR_VERSION) {
-            throw new IOException("Incompatible layout version found : " 
-                                  + layoutVersion);
-        }
+    public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
 
         this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
-        activeLedgers = new SnapshotMap<Long, Boolean>();
-
         this.scheduler = Executors.newSingleThreadScheduledExecutor();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Using HierarchicalLedgerManager with root path : " + ledgerRootPath);
@@ -139,7 +121,7 @@ class HierarchicalLedgerManager extends 
     }
 
     @Override
-    public void newLedgerPath(final GenericCallback<String> ledgerCb, final LedgerMetadata metadata) {
+    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
         ZkUtils.createFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
             @Override
@@ -161,20 +143,23 @@ class HierarchicalLedgerManager extends 
                     ledgerCb.operationComplete(KeeperException.Code.SYSTEMERROR.intValue(), null);
                     return;
                 }
+                String ledgerPath = getLedgerPath(ledgerId);
+                final long lid = ledgerId;
                 StringCallback scb = new StringCallback() {
                     @Override
                     public void processResult(int rc, String path,
                             Object ctx, String name) {
                         if (rc != KeeperException.Code.OK.intValue()) {
+                            LOG.error("Could not create node for ledger",
+                                      KeeperException.create(KeeperException.Code.get(rc), path));
                             ledgerCb.operationComplete(rc, null);
                         } else {
-                            // update znode status
-                            metadata.updateZnodeStatus(0);
-                            ledgerCb.operationComplete(rc, name);
+                            // update version
+                            metadata.setVersion(new ZkVersion(0));
+                            ledgerCb.operationComplete(rc, lid);
                         }
                     }
                 };
-                String ledgerPath = getLedgerPath(ledgerId);
                 ZkUtils.createFullPathOptimistic(zk, ledgerPath, metadata.serialize(),
                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null);
                 // delete the znode for id generation
@@ -351,21 +336,6 @@ class HierarchicalLedgerManager extends 
     }
 
     @Override
-    public void addActiveLedger(long ledgerId, boolean active) {
-        activeLedgers.put(ledgerId, active);
-    }
-
-    @Override
-    public void removeActiveLedger(long ledgerId) {
-        activeLedgers.remove(ledgerId);
-    }
-
-    @Override
-    public boolean containsActiveLedger(long ledgerId) {
-        return activeLedgers.containsKey(ledgerId);
-    }
-
-    @Override
     public void garbageCollectLedgers(GarbageCollector gc) {
         // create a snapshot before garbage collection
         NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,72 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+    public static final String NAME = "hierarchical";
+    public static final int CUR_VERSION = 1;
+
+    AbstractConfiguration conf;
+    ZooKeeper zk;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+                                           final ZooKeeper zk,
+                                           final int factoryVersion)
+    throws IOException {
+        if (CUR_VERSION != factoryVersion) {
+            throw new IOException("Incompatible layout version found : "
+                                + factoryVersion);
+        }
+        this.conf = conf;
+        this.zk = zk;
+        return this;
+    }
+
+    @Override
+    public void uninitialize() throws IOException {
+        // since zookeeper instance is passed from outside
+        // we don't need to close it here
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
+
+    @Override
+    public ActiveLedgerManager newActiveLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
+
+}