You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/09/05 19:39:03 UTC

svn commit: r1165369 [2/9] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/clie...

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Mon Sep  5 17:38:57 2011
@@ -1,7 +1,7 @@
 package org.apache.bookkeeper.client;
 
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 import java.io.IOException;
@@ -41,429 +41,429 @@ import org.jboss.netty.channel.socket.ni
 /**
  * BookKeeper client. We assume there is one single writer to a ledger at any
  * time.
- * 
+ *
  * There are four possible operations: start a new ledger, write to a ledger,
  * read from a ledger and delete a ledger.
- * 
+ *
  * The exceptions resulting from synchronous calls and error code resulting from
  * asynchronous calls can be found in the class {@link BKException}.
- * 
- * 
+ *
+ *
  */
 
 public class BookKeeper implements OpenCallback, CreateCallback, DeleteCallback {
 
-  static final Logger LOG = Logger.getLogger(BookKeeper.class);
+    static final Logger LOG = Logger.getLogger(BookKeeper.class);
 
-  ZooKeeper zk = null;
-  // whether the zk handle is one we created, or is owned by whoever
-  // instantiated us
-  boolean ownZKHandle = false;
-
-  ClientSocketChannelFactory channelFactory;
-  // whether the socket factory is one we created, or is owned by whoever
-  // instantiated us
-  boolean ownChannelFactory = false;
-
-  BookieClient bookieClient;
-  BookieWatcher bookieWatcher;
-
-  OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
-      .getRuntime().availableProcessors());
-  OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
-      .getRuntime().availableProcessors());
-
-  /**
-   * Create a bookkeeper client. A zookeeper client and a client socket factory
-   * will be instantiated as part of this constructor.
-   * 
-   * @param servers
-   *          A list of one of more servers on which zookeeper is running. The
-   *          client assumes that the running bookies have been registered with
-   *          zookeeper under the path
-   *          {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  public BookKeeper(String servers) throws IOException, InterruptedException,
-      KeeperException {
-    this(new ZooKeeper(servers, 10000, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        // TODO: handle session disconnects and expires
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Process: " + event.getType() + " " + event.getPath());
+    ZooKeeper zk = null;
+    // whether the zk handle is one we created, or is owned by whoever
+    // instantiated us
+    boolean ownZKHandle = false;
+
+    ClientSocketChannelFactory channelFactory;
+    // whether the socket factory is one we created, or is owned by whoever
+    // instantiated us
+    boolean ownChannelFactory = false;
+
+    BookieClient bookieClient;
+    BookieWatcher bookieWatcher;
+
+    OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
+            .getRuntime().availableProcessors());
+    OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+            .getRuntime().availableProcessors());
+
+    /**
+     * Create a bookkeeper client. A zookeeper client and a client socket factory
+     * will be instantiated as part of this constructor.
+     *
+     * @param servers
+     *          A list of one of more servers on which zookeeper is running. The
+     *          client assumes that the running bookies have been registered with
+     *          zookeeper under the path
+     *          {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public BookKeeper(String servers) throws IOException, InterruptedException,
+        KeeperException {
+        this(new ZooKeeper(servers, 10000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // TODO: handle session disconnects and expires
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Process: " + event.getType() + " " + event.getPath());
+                }
+            }
+        }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                                              Executors.newCachedThreadPool()));
+
+        ownZKHandle = true;
+        ownChannelFactory = true;
+    }
+
+    /**
+     * Create a bookkeeper client but use the passed in zookeeper client instead
+     * of instantiating one.
+     *
+     * @param zk
+     *          Zookeeper client instance connected to the zookeeper with which
+     *          the bookies have registered
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
+        this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool()));
+        ownChannelFactory = true;
+    }
+
+    /**
+     * Create a bookkeeper client but use the passed in zookeeper client and
+     * client socket channel factory instead of instantiating those.
+     *
+     * @param zk
+     *          Zookeeper client instance connected to the zookeeper with which
+     *          the bookies have registered
+     * @param channelFactory
+     *          A factory that will be used to create connections to the bookies
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
+            throws InterruptedException, KeeperException {
+        if (zk == null || channelFactory == null) {
+            throw new NullPointerException();
         }
-      }
-    }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-        Executors.newCachedThreadPool()));
-
-    ownZKHandle = true;
-    ownChannelFactory = true;
-  }
-
-  /**
-   * Create a bookkeeper client but use the passed in zookeeper client instead
-   * of instantiating one.
-   * 
-   * @param zk
-   *          Zookeeper client instance connected to the zookeeper with which
-   *          the bookies have registered
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
-    this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-        Executors.newCachedThreadPool()));
-    ownChannelFactory = true;
-  }
-
-  /**
-   * Create a bookkeeper client but use the passed in zookeeper client and
-   * client socket channel factory instead of instantiating those.
-   * 
-   * @param zk
-   *          Zookeeper client instance connected to the zookeeper with which
-   *          the bookies have registered
-   * @param channelFactory
-   *          A factory that will be used to create connections to the bookies
-   * @throws InterruptedException
-   * @throws KeeperException
-   */
-  public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
-      throws InterruptedException, KeeperException {
-    if (zk == null || channelFactory == null) {
-      throw new NullPointerException();
-    }
-    this.zk = zk;
-    this.channelFactory = channelFactory;
-    bookieWatcher = new BookieWatcher(this);
-    bookieWatcher.readBookiesBlocking();
-    bookieClient = new BookieClient(channelFactory, mainWorkerPool);
-  }
-
-  /**
-   * There are 2 digest types that can be used for verification. The CRC32 is
-   * cheap to compute but does not protect against byzantine bookies (i.e., a
-   * bookie might report fake bytes and a matching CRC32). The MAC code is more
-   * expensive to compute, but is protected by a password, i.e., a bookie can't
-   * report fake bytes with a mathching MAC unless it knows the password
-   */
-  public enum DigestType {
-    MAC, CRC32
-  };
-
-  public ZooKeeper getZkHandle() {
-    return zk;
-  }
-
-  /**
-   * Get the BookieClient, currently used for doing bookie recovery.
-   * 
-   * @return BookieClient for the BookKeeper instance.
-   */
-  public BookieClient getBookieClient() {
-      return bookieClient;
-  }
-  
-  /**
-   * Creates a new ledger asynchronously. To create a ledger, we need to specify
-   * the ensemble size, the quorum size, the digest type, a password, a callback
-   * implementation, and an optional control object. The ensemble size is how
-   * many bookies the entries should be striped among and the quorum size is the
-   * degree of replication of each entry. The digest type is either a MAC or a
-   * CRC. Note that the CRC option is not able to protect a client against a
-   * bookie that replaces an entry. The password is used not only to
-   * authenticate access to a ledger, but also to verify entries in ledgers.
-   * 
-   * @param ensSize
-   *          ensemble size
-   * @param qSize
-   *          quorum size
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @param cb
-   *          createCallback implementation
-   * @param ctx
-   *          optional control object
-   */
-  public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
-      byte[] passwd, CreateCallback cb, Object ctx) {
+        this.zk = zk;
+        this.channelFactory = channelFactory;
+        bookieWatcher = new BookieWatcher(this);
+        bookieWatcher.readBookiesBlocking();
+        bookieClient = new BookieClient(channelFactory, mainWorkerPool);
+    }
+
+    /**
+     * There are 2 digest types that can be used for verification. The CRC32 is
+     * cheap to compute but does not protect against byzantine bookies (i.e., a
+     * bookie might report fake bytes and a matching CRC32). The MAC code is more
+     * expensive to compute, but is protected by a password, i.e., a bookie can't
+     * report fake bytes with a mathching MAC unless it knows the password
+     */
+    public enum DigestType {
+        MAC, CRC32
+    };
+
+    public ZooKeeper getZkHandle() {
+        return zk;
+    }
+
+    /**
+     * Get the BookieClient, currently used for doing bookie recovery.
+     *
+     * @return BookieClient for the BookKeeper instance.
+     */
+    public BookieClient getBookieClient() {
+        return bookieClient;
+    }
+
+    /**
+     * Creates a new ledger asynchronously. To create a ledger, we need to specify
+     * the ensemble size, the quorum size, the digest type, a password, a callback
+     * implementation, and an optional control object. The ensemble size is how
+     * many bookies the entries should be striped among and the quorum size is the
+     * degree of replication of each entry. The digest type is either a MAC or a
+     * CRC. Note that the CRC option is not able to protect a client against a
+     * bookie that replaces an entry. The password is used not only to
+     * authenticate access to a ledger, but also to verify entries in ledgers.
+     *
+     * @param ensSize
+     *          ensemble size
+     * @param qSize
+     *          quorum size
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param cb
+     *          createCallback implementation
+     * @param ctx
+     *          optional control object
+     */
+    public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
+                                  byte[] passwd, CreateCallback cb, Object ctx) {
 
-    new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
+        new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
         .initiate();
 
-  }
+    }
+
+    /**
+     * Create callback implementation for synchronous create call.
+     *
+     * @param rc
+     *          return code
+     * @param lh
+     *          ledger handle object
+     * @param ctx
+     *          optional control object
+     */
+    public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+        SyncCounter counter = (SyncCounter) ctx;
+        counter.setLh(lh);
+        counter.setrc(rc);
+        counter.dec();
+    }
+
+    /**
+     * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
+     *
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @return
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+            throws KeeperException, BKException, InterruptedException, IOException {
+        return createLedger(3, 2, digestType, passwd);
+    }
+
+    /**
+     * Synchronous call to create ledger. Parameters match those of
+     * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
+     *
+     * @param ensSize
+     * @param qSize
+     * @param digestType
+     * @param passwd
+     * @return
+     * @throws KeeperException
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws BKException
+     */
+    public LedgerHandle createLedger(int ensSize, int qSize,
+                                     DigestType digestType, byte passwd[]) throws KeeperException,
+        InterruptedException, IOException, BKException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+        /*
+         * Calls asynchronous version
+         */
+        asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
+
+        /*
+         * Wait
+         */
+        counter.block(0);
+        if (counter.getLh() == null) {
+            LOG.error("ZooKeeper error: " + counter.getrc());
+            throw BKException.create(Code.ZKException);
+        }
+
+        return counter.getLh();
+    }
+
+    /**
+     * Open existing ledger asynchronously for reading.
+     *
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param ctx
+     *          optional control object
+     */
+    public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
+                                OpenCallback cb, Object ctx) {
+
+        new LedgerOpenOp(this, lId, digestType, passwd, false, cb, ctx).initiate();
+
+    }
+
+    /**
+     * Open existing ledger asynchronously for reading, but it does not try to
+     * recover the ledger if it is not yet closed. The application needs to use
+     * it carefully, since the writer might have crash and ledger will remain
+     * unsealed forever if there is no external mechanism to detect the failure
+     * of the writer and the ledger is not open in a safe manner, invoking the
+     * recovery procedure.
+     *
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param ctx
+     *          optional control object
+     */
+
+    public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
+                                          OpenCallback cb, Object ctx) {
+
+        new LedgerOpenOp(this, lId, digestType, passwd, true, cb, ctx).initiate();
+
+    }
+
+    /**
+     * Callback method for synchronous open operation
+     *
+     * @param rc
+     *          return code
+     * @param lh
+     *          ledger handle
+     * @param ctx
+     *          optional control object
+     */
+    public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+        SyncCounter counter = (SyncCounter) ctx;
+        counter.setLh(lh);
+
+        LOG.debug("Open complete: " + rc);
+
+        counter.setrc(rc);
+        counter.dec();
+    }
+
+    /**
+     * Synchronous open ledger call
+     *
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @return
+     * @throws InterruptedException
+     * @throws BKException
+     */
+
+    public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+
+        /*
+         * Calls async open ledger
+         */
+        asyncOpenLedger(lId, digestType, passwd, this, counter);
+
+        /*
+         * Wait
+         */
+        counter.block(0);
+        if (counter.getrc() != BKException.Code.OK)
+            throw BKException.create(counter.getrc());
+
+        return counter.getLh();
+    }
+
+    /**
+     * Synchronous, unsafe open ledger call
+     *
+     * @param lId
+     *          ledger identifier
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @return
+     * @throws InterruptedException
+     * @throws BKException
+     */
 
-  /**
-   * Create callback implementation for synchronous create call.
-   * 
-   * @param rc
-   *          return code
-   * @param lh
-   *          ledger handle object
-   * @param ctx
-   *          optional control object
-   */
-  public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-    SyncCounter counter = (SyncCounter) ctx;
-    counter.setLh(lh);
-    counter.setrc(rc);
-    counter.dec();
-  }
-
-  /**
-   * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
-   * 
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
-   * @throws BKException
-   */
-  public LedgerHandle createLedger(DigestType digestType, byte passwd[])
-      throws KeeperException, BKException, InterruptedException, IOException {
-    return createLedger(3, 2, digestType, passwd);
-  }
-
-  /**
-   * Synchronous call to create ledger. Parameters match those of
-   * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
-   * 
-   * @param ensSize
-   * @param qSize
-   * @param digestType
-   * @param passwd
-   * @return
-   * @throws KeeperException
-   * @throws InterruptedException
-   * @throws IOException
-   * @throws BKException
-   */
-  public LedgerHandle createLedger(int ensSize, int qSize,
-      DigestType digestType, byte passwd[]) throws KeeperException,
-      InterruptedException, IOException, BKException {
-    SyncCounter counter = new SyncCounter();
-    counter.inc();
-    /*
-     * Calls asynchronous version
-     */
-    asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
-
-    /*
-     * Wait
-     */
-    counter.block(0);
-    if (counter.getLh() == null) {
-      LOG.error("ZooKeeper error: " + counter.getrc());
-      throw BKException.create(Code.ZKException);
-    }
-
-    return counter.getLh();
-  }
-
-  /**
-   * Open existing ledger asynchronously for reading.
-   * 
-   * @param lId
-   *          ledger identifier
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @param ctx
-   *          optional control object
-   */
-  public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
-      OpenCallback cb, Object ctx) {
-
-      new LedgerOpenOp(this, lId, digestType, passwd, false, cb, ctx).initiate();
-
-  }
-
-  /**
-   * Open existing ledger asynchronously for reading, but it does not try to
-   * recover the ledger if it is not yet closed. The application needs to use
-   * it carefully, since the writer might have crash and ledger will remain 
-   * unsealed forever if there is no external mechanism to detect the failure 
-   * of the writer and the ledger is not open in a safe manner, invoking the
-   * recovery procedure.
-   * 
-   * @param lId
-   *          ledger identifier
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @param ctx
-   *          optional control object
-   */
-  
-  public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte passwd[],
-          OpenCallback cb, Object ctx) {
-
-      new LedgerOpenOp(this, lId, digestType, passwd, true, cb, ctx).initiate();
-
-  }
-  
-  /**
-   * Callback method for synchronous open operation
-   * 
-   * @param rc
-   *          return code
-   * @param lh
-   *          ledger handle
-   * @param ctx
-   *          optional control object
-   */
-  public void openComplete(int rc, LedgerHandle lh, Object ctx) {
-    SyncCounter counter = (SyncCounter) ctx;
-    counter.setLh(lh);
-
-    LOG.debug("Open complete: " + rc);
-
-    counter.setrc(rc);
-    counter.dec();
-  }
-
-  /**
-   * Synchronous open ledger call
-   * 
-   * @param lId
-   *          ledger identifier
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @return
-   * @throws InterruptedException
-   * @throws BKException
-   */
-
-  public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
-      throws BKException, InterruptedException {
-    SyncCounter counter = new SyncCounter();
-    counter.inc();
-
-    /*
-     * Calls async open ledger
-     */
-    asyncOpenLedger(lId, digestType, passwd, this, counter);
-
-    /*
-     * Wait
-     */
-    counter.block(0);
-    if (counter.getrc() != BKException.Code.OK)
-      throw BKException.create(counter.getrc());
-
-    return counter.getLh();
-  }
-
-  /**
-   * Synchronous, unsafe open ledger call
-   * 
-   * @param lId
-   *          ledger identifier
-   * @param digestType
-   *          digest type, either MAC or CRC32
-   * @param passwd
-   *          password
-   * @return
-   * @throws InterruptedException
-   * @throws BKException
-   */
-
-  public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
-  throws BKException, InterruptedException {
-      SyncCounter counter = new SyncCounter();
-      counter.inc();
-
-      /*
-       * Calls async open ledger
-       */
-      asyncOpenLedgerNoRecovery(lId, digestType, passwd, this, counter);
-
-      /*
-       * Wait
-       */
-      counter.block(0);
-      if (counter.getrc() != BKException.Code.OK)
-          throw BKException.create(counter.getrc());
-
-      return counter.getLh();
-  }
-  
-  /**
-   * Deletes a ledger asynchronously.
-   * 
-   * @param lId
-   *            ledger Id
-   * @param cb
-   *            deleteCallback implementation
-   * @param ctx
-   *            optional control object
-   */
-  public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
-      new LedgerDeleteOp(this, lId, cb, ctx).initiate();
-  }
-  
-  /**
-   * Delete callback implementation for synchronous delete call.
-   * 
-   * @param rc
-   *            return code
-   * @param ctx
-   *            optional control object
-   */
-  public void deleteComplete(int rc, Object ctx) {
-      SyncCounter counter = (SyncCounter) ctx;
-      counter.setrc(rc);
-      counter.dec();
-  }
-
-  /**
-   * Synchronous call to delete a ledger. Parameters match those of
-   * {@link #asyncDeleteLedger(long, DeleteCallback, Object)}
-   * 
-   * @param lId
-   *            ledgerId
-   * @throws InterruptedException
-   * @throws BKException
-   */
-  public void deleteLedger(long lId) throws InterruptedException, BKException {
-      SyncCounter counter = new SyncCounter();
-      counter.inc();
-      // Call asynchronous version
-      asyncDeleteLedger(lId, this, counter);
-      // Wait
-      counter.block(0);
-      if (counter.getrc() != KeeperException.Code.OK.intValue()) { 
-          LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
-          throw BKException.create(Code.ZKException);
-      }
-  }
-  
-  /**
-   * Shuts down client.
-   * 
-   */
-  public void halt() throws InterruptedException {
-    bookieClient.close();
-    bookieWatcher.halt();
-    if (ownChannelFactory) {
-      channelFactory.releaseExternalResources();
-    }
-    if (ownZKHandle) {
-      zk.close();
-    }
-    callbackWorker.shutdown();
-    mainWorkerPool.shutdown();
-  }
+    public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+
+        /*
+         * Calls async open ledger
+         */
+        asyncOpenLedgerNoRecovery(lId, digestType, passwd, this, counter);
+
+        /*
+         * Wait
+         */
+        counter.block(0);
+        if (counter.getrc() != BKException.Code.OK)
+            throw BKException.create(counter.getrc());
+
+        return counter.getLh();
+    }
+
+    /**
+     * Deletes a ledger asynchronously.
+     *
+     * @param lId
+     *            ledger Id
+     * @param cb
+     *            deleteCallback implementation
+     * @param ctx
+     *            optional control object
+     */
+    public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
+        new LedgerDeleteOp(this, lId, cb, ctx).initiate();
+    }
+
+    /**
+     * Delete callback implementation for synchronous delete call.
+     *
+     * @param rc
+     *            return code
+     * @param ctx
+     *            optional control object
+     */
+    public void deleteComplete(int rc, Object ctx) {
+        SyncCounter counter = (SyncCounter) ctx;
+        counter.setrc(rc);
+        counter.dec();
+    }
+
+    /**
+     * Synchronous call to delete a ledger. Parameters match those of
+     * {@link #asyncDeleteLedger(long, DeleteCallback, Object)}
+     *
+     * @param lId
+     *            ledgerId
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public void deleteLedger(long lId) throws InterruptedException, BKException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+        // Call asynchronous version
+        asyncDeleteLedger(lId, this, counter);
+        // Wait
+        counter.block(0);
+        if (counter.getrc() != KeeperException.Code.OK.intValue()) {
+            LOG.error("ZooKeeper error deleting ledger node: " + counter.getrc());
+            throw BKException.create(Code.ZKException);
+        }
+    }
+
+    /**
+     * Shuts down client.
+     *
+     */
+    public void halt() throws InterruptedException {
+        bookieClient.close();
+        bookieWatcher.halt();
+        if (ownChannelFactory) {
+            channelFactory.releaseExternalResources();
+        }
+        if (ownZKHandle) {
+            zk.close();
+        }
+        callbackWorker.shutdown();
+        mainWorkerPool.shutdown();
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Mon Sep  5 17:38:57 2011
@@ -44,11 +44,11 @@ import org.apache.zookeeper.KeeperExcept
  * are available by reading Zookeeper (and setting watches on the bookie nodes).
  * When a bookie fails, the other parts of the code turn to this class to find a
  * replacement
- * 
+ *
  */
 class BookieWatcher implements Watcher, ChildrenCallback {
     static final Logger logger = Logger.getLogger(BookieWatcher.class);
-    
+
     public static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available";
     static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
     public static int ZK_CONNECT_BACKOFF_SEC = 1;
@@ -69,8 +69,8 @@ class BookieWatcher implements Watcher, 
         this.bk = bk;
         this.scheduler = Executors.newSingleThreadScheduledExecutor();
     }
-    
-    public void halt(){
+
+    public void halt() {
         scheduler.shutdown();
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java Mon Sep  5 17:38:57 2011
@@ -24,7 +24,7 @@ import java.util.zip.CRC32;
 
 class CRC32DigestManager extends DigestManager {
     CRC32 crc = new CRC32();
-    
+
     public CRC32DigestManager(long ledgerId) {
         super(ledgerId);
     }
@@ -33,7 +33,7 @@ class CRC32DigestManager extends DigestM
     int getMacCodeLength() {
         return 8;
     }
-    
+
     @Override
     byte[] getValueAndReset() {
         byte[] value = new byte[8];
@@ -42,7 +42,7 @@ class CRC32DigestManager extends DigestM
         crc.reset();
         return value;
     }
-    
+
     @Override
     void update(byte[] data, int offset, int length) {
         crc.update(data, offset, length);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java Mon Sep  5 17:38:57 2011
@@ -37,29 +37,29 @@ import org.jboss.netty.buffer.ChannelBuf
 
 public abstract class DigestManager {
     static final Logger logger = Logger.getLogger(DigestManager.class);
-    
+
     static final int METADATA_LENGTH = 32;
-    
+
     long ledgerId;
-    
+
     abstract int getMacCodeLength();
-    
-    void update(byte[] data){
+
+    void update(byte[] data) {
         update(data, 0, data.length);
     }
-    
+
     abstract void update(byte[] data, int offset, int length);
     abstract byte[] getValueAndReset();
-    
+
     final int macCodeLength;
 
     public DigestManager(long ledgerId) {
         this.ledgerId = ledgerId;
         macCodeLength = getMacCodeLength();
     }
-    
-    static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException{
-        switch(digestType){
+
+    static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException {
+        switch(digestType) {
         case MAC:
             return new MacDigestManager(ledgerId, passwd);
         case CRC32:
@@ -71,14 +71,14 @@ public abstract class DigestManager {
 
     /**
      * Computes the digest for an entry and put bytes together for sending.
-     *  
+     *
      * @param entryId
      * @param lastAddConfirmed
      * @param length
      * @param data
      * @return
      */
-    
+
     public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) {
 
         byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength];
@@ -133,21 +133,21 @@ public abstract class DigestManager {
 
         if (actualLedgerId != ledgerId) {
             logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
-                    + actualLedgerId);
+                         + actualLedgerId);
             throw new BKDigestMatchException();
         }
 
         if (!skipEntryIdCheck && actualEntryId != entryId) {
             logger.error("Entry-id mismatch in authenticated message, expected: " + entryId + " , actual: "
-                    + actualEntryId);
+                         + actualEntryId);
             throw new BKDigestMatchException();
         }
 
     }
-    
+
     /**
      * Verify that the digest matches and returns the data in the entry.
-     * 
+     *
      * @param entryId
      * @param dataReceived
      * @return

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java Mon Sep  5 17:38:57 2011
@@ -20,7 +20,7 @@ package org.apache.bookkeeper.client;
 
 /**
  * This interface determins how entries are distributed among bookies.
- * 
+ *
  * Every entry gets replicated to some number of replicas. The first replica for
  * an entry is given a replicaIndex of 0, and so on. To distribute write load,
  * not all entries go to all bookies. Given an entry-id and replica index, an
@@ -31,7 +31,7 @@ package org.apache.bookkeeper.client;
 public interface DistributionSchedule {
 
     /**
-     * 
+     *
      * @param entryId
      * @param replicaIndex
      * @return index of bookie that should get this replica
@@ -39,7 +39,7 @@ public interface DistributionSchedule {
     public int getBookieIndex(long entryId, int replicaIndex);
 
     /**
-     * 
+     *
      * @param entryId
      * @param bookieIndex
      * @return -1 if the given bookie index is not a replica for the given
@@ -53,7 +53,7 @@ public interface DistributionSchedule {
      * sequence and an implementation of this interface should accumulate
      * history about which bookie indexes we have heard from. Once this method
      * has returned true, it wont be called again on the same instance
-     * 
+     *
      * @param bookieIndexHeardFrom
      * @return true if its ok to proceed with recovery
      */

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Mon Sep  5 17:38:57 2011
@@ -1,5 +1,5 @@
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,16 +7,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 package org.apache.bookkeeper.client;
@@ -39,7 +39,7 @@ import org.apache.zookeeper.data.Stat;
 
 /**
  * Encapsulates asynchronous ledger create operation
- * 
+ *
  */
 class LedgerCreateOp implements StringCallback, StatCallback {
 
@@ -53,24 +53,24 @@ class LedgerCreateOp implements StringCa
     BookKeeper bk;
     DigestType digestType;
 
-   /**
-    * Constructor
-    * 
-    * @param bk
-    *       BookKeeper object
-    * @param ensembleSize
-    *       ensemble size
-    * @param quorumSize
-    *       quorum size
-    * @param digestType
-    *       digest type, either MAC or CRC32
-    * @param passwd
-    *       passowrd
-    * @param cb
-    *       callback implementation
-    * @param ctx
-    *       optional control object
-    */
+    /**
+     * Constructor
+     *
+     * @param bk
+     *       BookKeeper object
+     * @param ensembleSize
+     *       ensemble size
+     * @param quorumSize
+     *       quorum size
+     * @param digestType
+     *       digest type, either MAC or CRC32
+     * @param passwd
+     *       passowrd
+     * @param cb
+     *       callback implementation
+     * @param ctx
+     *       optional control object
+     */
 
     LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
         this.bk = bk;
@@ -91,7 +91,7 @@ class LedgerCreateOp implements StringCa
          */
 
         bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT_SEQUENTIAL, this, null);
+                                CreateMode.PERSISTENT_SEQUENTIAL, this, null);
 
         // calls the children callback method below
     }
@@ -99,7 +99,7 @@ class LedgerCreateOp implements StringCa
 
     /**
      * Implements ZooKeeper string callback.
-     * 
+     *
      * @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
      */
     public void processResult(int rc, String path, Object ctx, String name) {
@@ -157,7 +157,7 @@ class LedgerCreateOp implements StringCa
 
     /**
      * Implements ZooKeeper stat callback.
-     * 
+     *
      * @see org.apache.zookeeper.AsyncCallback.StatCallback#processResult(int, String, Object, Stat)
      */
     public void processResult(int rc, String path, Object ctx, Stat stat) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Sep  5 17:38:57 2011
@@ -1,5 +1,5 @@
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,16 +7,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 package org.apache.bookkeeper.client;
@@ -28,7 +28,7 @@ import org.apache.zookeeper.AsyncCallbac
 
 /**
  * Encapsulates asynchronous ledger delete operation
- * 
+ *
  */
 class LedgerDeleteOp implements VoidCallback {
 
@@ -41,7 +41,7 @@ class LedgerDeleteOp implements VoidCall
 
     /**
      * Constructor
-     * 
+     *
      * @param bk
      *            BookKeeper object
      * @param ledgerId
@@ -69,7 +69,7 @@ class LedgerDeleteOp implements VoidCall
 
     /**
      * Implements ZooKeeper Void Callback.
-     * 
+     *
      * @see org.apache.zookeeper.AsyncCallback.VoidCallback#processResult(int,
      *      java.lang.String, java.lang.Object)
      */

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerEntry.java Mon Sep  5 17:38:57 2011
@@ -1,7 +1,7 @@
 package org.apache.bookkeeper.client;
 
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 import java.io.IOException;
@@ -30,54 +30,54 @@ import org.jboss.netty.buffer.ChannelBuf
 /**
  * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
  * the entry content.
- * 
+ *
  */
 
 public class LedgerEntry {
-  Logger LOG = Logger.getLogger(LedgerEntry.class);
+    Logger LOG = Logger.getLogger(LedgerEntry.class);
 
-  long ledgerId;
-  long entryId;
-  long length;
-  ChannelBufferInputStream entryDataStream;
-
-  int nextReplicaIndexToReadFrom = 0;
-
-  LedgerEntry(long lId, long eId) {
-    this.ledgerId = lId;
-    this.entryId = eId;
-  }
-
-  public long getLedgerId() {
-    return ledgerId;
-  }
-
-  public long getEntryId() {
-    return entryId;
-  }
-  
-  public long getLength() {
-      return length;
-  }
-
-  public byte[] getEntry() {
-    try {
-      // In general, you can't rely on the available() method of an input
-      // stream, but ChannelBufferInputStream is backed by a byte[] so it
-      // accurately knows the # bytes available
-      byte[] ret = new byte[entryDataStream.available()];
-      entryDataStream.readFully(ret);
-      return ret;
-    } catch (IOException e) {
-      // The channelbufferinput stream doesnt really throw the
-      // ioexceptions, it just has to be in the signature because
-      // InputStream says so. Hence this code, should never be reached.
-      LOG.fatal("Unexpected IOException while reading from channel buffer", e);
-      return new byte[0];
-    }
-  }
-
-  public InputStream getEntryInputStream() {
-    return entryDataStream;
-  }
+    long ledgerId;
+    long entryId;
+    long length;
+    ChannelBufferInputStream entryDataStream;
+
+    int nextReplicaIndexToReadFrom = 0;
+
+    LedgerEntry(long lId, long eId) {
+        this.ledgerId = lId;
+        this.entryId = eId;
+    }
+
+    public long getLedgerId() {
+        return ledgerId;
+    }
+
+    public long getEntryId() {
+        return entryId;
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    public byte[] getEntry() {
+        try {
+            // In general, you can't rely on the available() method of an input
+            // stream, but ChannelBufferInputStream is backed by a byte[] so it
+            // accurately knows the # bytes available
+            byte[] ret = new byte[entryDataStream.available()];
+            entryDataStream.readFully(ret);
+            return ret;
+        } catch (IOException e) {
+            // The channelbufferinput stream doesnt really throw the
+            // ioexceptions, it just has to be in the signature because
+            // InputStream says so. Hence this code, should never be reached.
+            LOG.fatal("Unexpected IOException while reading from channel buffer", e);
+            return new byte[0];
+        }
+    }
+
+    public InputStream getEntryInputStream() {
+        return entryDataStream;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Mon Sep  5 17:38:57 2011
@@ -1,7 +1,7 @@
 package org.apache.bookkeeper.client;
 
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -9,16 +9,16 @@ package org.apache.bookkeeper.client;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 import java.net.InetSocketAddress;
@@ -53,275 +53,275 @@ import org.jboss.netty.buffer.ChannelBuf
  * write operations to a ledger.
  */
 public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback, ReadLastConfirmedCallback {
-  final static Logger LOG = Logger.getLogger(LedgerHandle.class);
-  final static long LAST_ADD_CONFIRMED = -1;
-  
-  final byte[] ledgerKey;
-  final LedgerMetadata metadata;
-  final BookKeeper bk;
-  final long ledgerId;
-  long lastAddPushed;
-  long lastAddConfirmed;
-  long length;
-  final DigestManager macManager;
-  final DistributionSchedule distributionSchedule;
-
-  final Semaphore opCounterSem;
-  private Integer throttling = 5000;
-  
-  final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
-  
-  LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-      DigestType digestType, byte[] password)
-      throws GeneralSecurityException, NumberFormatException {
-    this.bk = bk;
-    this.metadata = metadata;
-    if (metadata.isClosed()) {
-      lastAddConfirmed = lastAddPushed = metadata.close;
-      length = metadata.length;
-    } else {
-      lastAddConfirmed = lastAddPushed = -1;
-      length = 0;
-    }
-    
-    this.ledgerId = ledgerId;
-    
-    String throttleValue = System.getProperty("throttle");
-    if(throttleValue != null){
-        this.throttling = new Integer(throttleValue); 
-    }
-    this.opCounterSem = new Semaphore(throttling);
-    
-    macManager = DigestManager.instantiate(ledgerId, password, digestType);
-    this.ledgerKey = MacDigestManager.genDigest("ledger", password);
-    distributionSchedule = new RoundRobinDistributionSchedule(
-        metadata.quorumSize, metadata.ensembleSize);
-  }
-  
-  /**
-   * Get the id of the current ledger
-   * 
-   * @return
-   */
-  public long getId() {
-    return ledgerId;
-  }
-
-  /**
-   * Get the last confirmed entry id on this ledger
-   * 
-   * @return
-   */
-  public long getLastAddConfirmed() {
-    return lastAddConfirmed;
-  }
-
-  /**
-   * Get the entry id of the last entry that has been enqueued for addition (but
-   * may not have possibly been persited to the ledger)
-   * 
-   * @return
-   */
-  public long getLastAddPushed() {
-    return lastAddPushed;
-  }
-
-  /**
-   * Get the Ledger's key/password.
-   * 
-   * @return byte array for the ledger's key/password.
-   */
-  public byte[] getLedgerKey() {
-      return ledgerKey;
-  }
-  
-  /**
-   * Get the LedgerMetadata
-   * 
-   * @return LedgerMetadata for the LedgerHandle
-   */
-  public LedgerMetadata getLedgerMetadata() {
-      return metadata;
-  }
-  
-  /**
-   * Get the DigestManager
-   * 
-   * @return DigestManager for the LedgerHandle
-   */
-  public DigestManager getDigestManager() {
-      return macManager;
-  }
-  
-  /**
-   * Return total number of available slots.
-   * 
-   * @return int    available slots
-   */
-  Semaphore getAvailablePermits(){
-      return this.opCounterSem;
-  }
-  
-  /**
-   *  Add to the length of the ledger in bytes.
-   *  
-   * @param delta
-   * @return
-   */
-  long addToLength(long delta){
-      this.length += delta;
-      return this.length;
-  }
-  
-  /**
-   * Returns the length of the ledger in bytes. 
-   * 
-   * @return
-   */
-  public long getLength(){
-      return this.length;
-  }
-  
-  /**
-   * Get the Distribution Schedule
-   * 
-   * @return DistributionSchedule for the LedgerHandle
-   */
-  public DistributionSchedule getDistributionSchedule() {
-      return distributionSchedule;
-  }
-  
-  public void writeLedgerConfig(StatCallback callback, Object ctx) {
-    bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
-        metadata.serialize(), -1, callback, ctx);
-  }
-
-  /**
-   * Close this ledger synchronously.
-   * 
-   */
-  public void close() throws InterruptedException {
-    SyncCounter counter = new SyncCounter();
-    counter.inc();
-
-    asyncClose(this, counter);
-
-    counter.block(0);
-  }
-
-  /**
-   * Asynchronous close, any adds in flight will return errors
-   * 
-   * @param cb
-   *          callback implementation
-   * @param ctx
-   *          control object
-   * @throws InterruptedException
-   */
-  public void asyncClose(CloseCallback cb, Object ctx) {
-    asyncClose(cb, ctx, BKException.Code.LedgerClosedException);
-  }
-
-  /**
-   * Same as public version of asynClose except that this one takes an
-   * additional parameter which is the return code to hand to all the pending
-   * add ops
-   * 
-   * @param cb
-   * @param ctx
-   * @param rc
-   */
-  private void asyncClose(final CloseCallback cb, final Object ctx, final int rc) {
-
-    bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-
-      @Override
-      public void safeRun() {
-        metadata.length = length;
-        // Close operation is idempotent, so no need to check if we are
-        // already closed
-        metadata.close(lastAddConfirmed);
-        errorOutPendingAdds(rc);
-        lastAddPushed = lastAddConfirmed;
+    final static Logger LOG = Logger.getLogger(LedgerHandle.class);
+    final static long LAST_ADD_CONFIRMED = -1;
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
-              + metadata.close + " with this many bytes: " + metadata.length);
+    final byte[] ledgerKey;
+    final LedgerMetadata metadata;
+    final BookKeeper bk;
+    final long ledgerId;
+    long lastAddPushed;
+    long lastAddConfirmed;
+    long length;
+    final DigestManager macManager;
+    final DistributionSchedule distributionSchedule;
+
+    final Semaphore opCounterSem;
+    private Integer throttling = 5000;
+
+    final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
+
+    LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+                 DigestType digestType, byte[] password)
+            throws GeneralSecurityException, NumberFormatException {
+        this.bk = bk;
+        this.metadata = metadata;
+        if (metadata.isClosed()) {
+            lastAddConfirmed = lastAddPushed = metadata.close;
+            length = metadata.length;
+        } else {
+            lastAddConfirmed = lastAddPushed = -1;
+            length = 0;
         }
 
-        writeLedgerConfig(new StatCallback() {
-          @Override
-          public void processResult(int rc, String path, Object subctx,
-              Stat stat) {
-            if (rc != KeeperException.Code.OK.intValue()) {
-              cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
-                  ctx);
-            } else {
-              cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+        this.ledgerId = ledgerId;
+
+        String throttleValue = System.getProperty("throttle");
+        if(throttleValue != null) {
+            this.throttling = new Integer(throttleValue);
+        }
+        this.opCounterSem = new Semaphore(throttling);
+
+        macManager = DigestManager.instantiate(ledgerId, password, digestType);
+        this.ledgerKey = MacDigestManager.genDigest("ledger", password);
+        distributionSchedule = new RoundRobinDistributionSchedule(
+            metadata.quorumSize, metadata.ensembleSize);
+    }
+
+    /**
+     * Get the id of the current ledger
+     *
+     * @return
+     */
+    public long getId() {
+        return ledgerId;
+    }
+
+    /**
+     * Get the last confirmed entry id on this ledger
+     *
+     * @return
+     */
+    public long getLastAddConfirmed() {
+        return lastAddConfirmed;
+    }
+
+    /**
+     * Get the entry id of the last entry that has been enqueued for addition (but
+     * may not have possibly been persited to the ledger)
+     *
+     * @return
+     */
+    public long getLastAddPushed() {
+        return lastAddPushed;
+    }
+
+    /**
+     * Get the Ledger's key/password.
+     *
+     * @return byte array for the ledger's key/password.
+     */
+    public byte[] getLedgerKey() {
+        return ledgerKey;
+    }
+
+    /**
+     * Get the LedgerMetadata
+     *
+     * @return LedgerMetadata for the LedgerHandle
+     */
+    public LedgerMetadata getLedgerMetadata() {
+        return metadata;
+    }
+
+    /**
+     * Get the DigestManager
+     *
+     * @return DigestManager for the LedgerHandle
+     */
+    public DigestManager getDigestManager() {
+        return macManager;
+    }
+
+    /**
+     * Return total number of available slots.
+     *
+     * @return int    available slots
+     */
+    Semaphore getAvailablePermits() {
+        return this.opCounterSem;
+    }
+
+    /**
+     *  Add to the length of the ledger in bytes.
+     *
+     * @param delta
+     * @return
+     */
+    long addToLength(long delta) {
+        this.length += delta;
+        return this.length;
+    }
+
+    /**
+     * Returns the length of the ledger in bytes.
+     *
+     * @return
+     */
+    public long getLength() {
+        return this.length;
+    }
+
+    /**
+     * Get the Distribution Schedule
+     *
+     * @return DistributionSchedule for the LedgerHandle
+     */
+    public DistributionSchedule getDistributionSchedule() {
+        return distributionSchedule;
+    }
+
+    public void writeLedgerConfig(StatCallback callback, Object ctx) {
+        bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
+                                 metadata.serialize(), -1, callback, ctx);
+    }
+
+    /**
+     * Close this ledger synchronously.
+     *
+     */
+    public void close() throws InterruptedException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+
+        asyncClose(this, counter);
+
+        counter.block(0);
+    }
+
+    /**
+     * Asynchronous close, any adds in flight will return errors
+     *
+     * @param cb
+     *          callback implementation
+     * @param ctx
+     *          control object
+     * @throws InterruptedException
+     */
+    public void asyncClose(CloseCallback cb, Object ctx) {
+        asyncClose(cb, ctx, BKException.Code.LedgerClosedException);
+    }
+
+    /**
+     * Same as public version of asynClose except that this one takes an
+     * additional parameter which is the return code to hand to all the pending
+     * add ops
+     *
+     * @param cb
+     * @param ctx
+     * @param rc
+     */
+    private void asyncClose(final CloseCallback cb, final Object ctx, final int rc) {
+
+        bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+
+            @Override
+            public void safeRun() {
+                metadata.length = length;
+                // Close operation is idempotent, so no need to check if we are
+                // already closed
+                metadata.close(lastAddConfirmed);
+                errorOutPendingAdds(rc);
+                lastAddPushed = lastAddConfirmed;
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
+                              + metadata.close + " with this many bytes: " + metadata.length);
+                }
+
+                writeLedgerConfig(new StatCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object subctx,
+                    Stat stat) {
+                        if (rc != KeeperException.Code.OK.intValue()) {
+                            cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
+                                             ctx);
+                        } else {
+                            cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+                        }
+                    }
+                }, null);
+
             }
-          }
-        }, null);
+        });
+    }
+
+    /**
+     * Read a sequence of entries synchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence (included)
+     * @param lastEntry
+     *          id of last entry of sequence (included)
+     *
+     */
+    public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
+            throws InterruptedException, BKException {
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+
+        asyncReadEntries(firstEntry, lastEntry, this, counter);
+
+        counter.block(0);
+        if (counter.getrc() != BKException.Code.OK) {
+            throw BKException.create(counter.getrc());
+        }
+
+        return counter.getSequence();
+    }
 
-      }
-    });
-  }
-
-  /**
-   * Read a sequence of entries synchronously.
-   * 
-   * @param firstEntry
-   *          id of first entry of sequence (included)
-   * @param lastEntry
-   *          id of last entry of sequence (included)
-   * 
-   */
-  public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
-      throws InterruptedException, BKException {
-    SyncCounter counter = new SyncCounter();
-    counter.inc();
-
-    asyncReadEntries(firstEntry, lastEntry, this, counter);
-
-    counter.block(0);
-    if (counter.getrc() != BKException.Code.OK) {
-      throw BKException.create(counter.getrc());
-    }
-
-    return counter.getSequence();
-  }
-
-  /**
-   * Read a sequence of entries asynchronously.
-   * 
-   * @param firstEntry
-   *          id of first entry of sequence
-   * @param lastEntry
-   *          id of last entry of sequence
-   * @param cb
-   *          object implementing read callback interface
-   * @param ctx
-   *          control object
-   */
-  public void asyncReadEntries(long firstEntry, long lastEntry,
-      ReadCallback cb, Object ctx) {
-    // Little sanity check
-    if (firstEntry < 0 || lastEntry > lastAddConfirmed
-        || firstEntry > lastEntry) {
-      cb.readComplete(BKException.Code.ReadException, this, null, ctx);
-      return;
-    }
-
-    try{
-        new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-  
-    } catch (InterruptedException e) {
-        cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
+    /**
+     * Read a sequence of entries asynchronously.
+     *
+     * @param firstEntry
+     *          id of first entry of sequence
+     * @param lastEntry
+     *          id of last entry of sequence
+     * @param cb
+     *          object implementing read callback interface
+     * @param ctx
+     *          control object
+     */
+    public void asyncReadEntries(long firstEntry, long lastEntry,
+                                 ReadCallback cb, Object ctx) {
+        // Little sanity check
+        if (firstEntry < 0 || lastEntry > lastAddConfirmed
+                || firstEntry > lastEntry) {
+            cb.readComplete(BKException.Code.ReadException, this, null, ctx);
+            return;
+        }
+
+        try {
+            new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
+
+        } catch (InterruptedException e) {
+            cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
+        }
     }
-  }
 
     /**
      * Add entry synchronously to an open ledger.
-     * 
+     *
      * @param data
      *         array of bytes to be written to the ledger
      */
@@ -331,7 +331,7 @@ public class LedgerHandle implements Rea
 
     /**
      * Add entry synchronously to an open ledger.
-     * 
+     *
      * @param data
      *         array of bytes to be written to the ledger
      * @param offset
@@ -339,36 +339,36 @@ public class LedgerHandle implements Rea
      * @param length
      *          number of bytes to take from data
      */
-    public long addEntry(byte[] data, int offset, int length) 
+    public long addEntry(byte[] data, int offset, int length)
             throws InterruptedException, BKException {
         LOG.debug("Adding entry " + data);
         SyncCounter counter = new SyncCounter();
         counter.inc();
-        
+
         asyncAddEntry(data, offset, length, this, counter);
         counter.block(0);
 
         return counter.getrc();
     }
 
-  /**
-   * Add entry asynchronously to an open ledger.
-   * 
-   * @param data
-   *          array of bytes to be written
-   * @param cb
-   *          object implementing callbackinterface
-   * @param ctx
-   *          some control object
-   */
-    public void asyncAddEntry(final byte[] data, final AddCallback cb, 
+    /**
+     * Add entry asynchronously to an open ledger.
+     *
+     * @param data
+     *          array of bytes to be written
+     * @param cb
+     *          object implementing callbackinterface
+     * @param ctx
+     *          some control object
+     */
+    public void asyncAddEntry(final byte[] data, final AddCallback cb,
                               final Object ctx) {
         asyncAddEntry(data, 0, data.length, cb, ctx);
     }
 
     /**
      * Add entry asynchronously to an open ledger, using an offset and range.
-     * 
+     *
      * @param data
      *          array of bytes to be written
      * @param offset
@@ -379,285 +379,285 @@ public class LedgerHandle implements Rea
      *          object implementing callbackinterface
      * @param ctx
      *          some control object
-     * @throws ArrayIndexOutOfBoundsException if offset or length is negative or 
+     * @throws ArrayIndexOutOfBoundsException if offset or length is negative or
      *          offset and length sum to a value higher than the length of data.
      */
-    public void asyncAddEntry(final byte[] data, final int offset, final int length, 
+    public void asyncAddEntry(final byte[] data, final int offset, final int length,
                               final AddCallback cb, final Object ctx) {
         if (offset < 0 || length < 0
-            || (offset + length) > data.length) {
+                || (offset + length) > data.length) {
             throw new ArrayIndexOutOfBoundsException(
-                    "Invalid values for offset("+offset
-                    +") or length("+length+")");
+                "Invalid values for offset("+offset
+                +") or length("+length+")");
         }
-        try{
+        try {
             opCounterSem.acquire();
         } catch (InterruptedException e) {
             cb.addComplete(BKException.Code.InterruptedException,
-                    LedgerHandle.this, -1, ctx);
+                           LedgerHandle.this, -1, ctx);
         }
-        
-        try{
+
+        try {
             bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        if (metadata.isClosed()) {
-                            LOG.warn("Attempt to add to closed ledger: " + ledgerId);
-                            LedgerHandle.this.opCounterSem.release();
-                            cb.addComplete(BKException.Code.LedgerClosedException,
-                                           LedgerHandle.this, -1, ctx);
-                            return;
-                        }
-                        
-                        long entryId = ++lastAddPushed;
-                        long currentLength = addToLength(length);
-                        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
-                        pendingAddOps.add(op);
-                        ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
-                                entryId, lastAddConfirmed, currentLength, data, offset, length);
-                        op.initiate(toSend);
+                @Override
+                public void safeRun() {
+                    if (metadata.isClosed()) {
+                        LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+                        LedgerHandle.this.opCounterSem.release();
+                        cb.addComplete(BKException.Code.LedgerClosedException,
+                                       LedgerHandle.this, -1, ctx);
+                        return;
                     }
-                });
+
+                    long entryId = ++lastAddPushed;
+                    long currentLength = addToLength(length);
+                    PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+                    pendingAddOps.add(op);
+                    ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+                                               entryId, lastAddConfirmed, currentLength, data, offset, length);
+                    op.initiate(toSend);
+                }
+            });
         } catch (RuntimeException e) {
             opCounterSem.release();
             throw e;
         }
     }
 
-  /**
-   * Obtains last confirmed write from a quorum of bookies.
-   * 
-   * @param cb
-   * @param ctx
-   */
-  
-  public void asyncReadLastConfirmed(ReadLastConfirmedCallback cb, Object ctx){
-      new ReadLastConfirmedOp(this, cb, ctx).initiate();
-  }
-  
-  
-  /**
-   * Context objects for synchronous call to read last confirmed. 
-   */
-  class LastConfirmedCtx {
-      long response;
-      int rc;
-      
-      LastConfirmedCtx(){
-          this.response = -1;
-      }
-      
-      void setLastConfirmed(long lastConfirmed){
-          this.response = lastConfirmed;
-      }
-      
-      long getlastConfirmed(){
-          return this.response;
-      }
-      
-      void setRC(int rc){
-          this.rc = rc;
-      }
-      
-      int getRC(){
-          return this.rc;
-      }
-      
-      boolean ready(){
-          return (this.response != -1);
-      }
-  }
-  
-  public long readLastConfirmed()
-  throws InterruptedException, BKException {   
-      LastConfirmedCtx ctx = new LastConfirmedCtx();
-      asyncReadLastConfirmed(this, ctx);
-      synchronized(ctx){
-          while(!ctx.ready()){
-              ctx.wait();
-          }
-      }
-      
-      if(ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC());
-      return ctx.getlastConfirmed();
-  }
-  
-  // close the ledger and send fails to all the adds in the pipeline
-  void handleUnrecoverableErrorDuringAdd(int rc) {
-    asyncClose(NoopCloseCallback.instance, null, rc);
-  }
-
-  void errorOutPendingAdds(int rc) {
-    PendingAddOp pendingAddOp;
-    while ((pendingAddOp = pendingAddOps.poll()) != null) {
-      pendingAddOp.submitCallback(rc);
-    }
-  }
-
-  void sendAddSuccessCallbacks() {
-    // Start from the head of the queue and proceed while there are
-    // entries that have had all their responses come back
-    PendingAddOp pendingAddOp;
-    while ((pendingAddOp = pendingAddOps.peek()) != null) {
-      if (pendingAddOp.numResponsesPending != 0) {
-        return;
-      }
-      pendingAddOps.remove();
-      lastAddConfirmed = pendingAddOp.entryId;
-      pendingAddOp.submitCallback(BKException.Code.OK);
-    }
-
-  }
-
-  void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
-    InetSocketAddress newBookie;
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Handling failure of bookie: " + addr + " index: "
-          + bookieIndex);
-    }
-
-    try {
-      newBookie = bk.bookieWatcher
-          .getAdditionalBookie(metadata.currentEnsemble);
-    } catch (BKNotEnoughBookiesException e) {
-      LOG
-          .error("Could not get additional bookie to remake ensemble, closing ledger: "
-              + ledgerId);
-      handleUnrecoverableErrorDuringAdd(e.getCode());
-      return;
-    }
-
-    final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(
-        metadata.currentEnsemble);
-    newEnsemble.set(bookieIndex, newBookie);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Changing ensemble from: " + metadata.currentEnsemble + " to: "
-          + newEnsemble + " for ledger: " + ledgerId + " starting at entry: "
-          + (lastAddConfirmed + 1));
-    }
-
-    metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
-
-    writeLedgerConfig(new StatCallback() {
-      @Override
-      public void processResult(final int rc, String path, Object ctx, Stat stat) {
+    /**
+     * Obtains last confirmed write from a quorum of bookies.
+     *
+     * @param cb
+     * @param ctx
+     */
+
+    public void asyncReadLastConfirmed(ReadLastConfirmedCallback cb, Object ctx) {
+        new ReadLastConfirmedOp(this, cb, ctx).initiate();
+    }
+
 
-        bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-          @Override
-          public void safeRun() {
-            if (rc != KeeperException.Code.OK.intValue()) {
-              LOG
-                  .error("Could not persist ledger metadata while changing ensemble to: "
-                      + newEnsemble + " , closing ledger");
-              handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
-              return;
+    /**
+     * Context objects for synchronous call to read last confirmed.
+     */
+    class LastConfirmedCtx {
+        long response;
+        int rc;
+
+        LastConfirmedCtx() {
+            this.response = -1;
+        }
+
+        void setLastConfirmed(long lastConfirmed) {
+            this.response = lastConfirmed;
+        }
+
+        long getlastConfirmed() {
+            return this.response;
+        }
+
+        void setRC(int rc) {
+            this.rc = rc;
+        }
+
+        int getRC() {
+            return this.rc;
+        }
+
+        boolean ready() {
+            return (this.response != -1);
+        }
+    }
+
+    public long readLastConfirmed()
+            throws InterruptedException, BKException {
+        LastConfirmedCtx ctx = new LastConfirmedCtx();
+        asyncReadLastConfirmed(this, ctx);
+        synchronized(ctx) {
+            while(!ctx.ready()) {
+                ctx.wait();
             }
+        }
+
+        if(ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC());
+        return ctx.getlastConfirmed();
+    }
+
+    // close the ledger and send fails to all the adds in the pipeline
+    void handleUnrecoverableErrorDuringAdd(int rc) {
+        asyncClose(NoopCloseCallback.instance, null, rc);
+    }
 
-            for (PendingAddOp pendingAddOp : pendingAddOps) {
-              pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+    void errorOutPendingAdds(int rc) {
+        PendingAddOp pendingAddOp;
+        while ((pendingAddOp = pendingAddOps.poll()) != null) {
+            pendingAddOp.submitCallback(rc);
+        }
+    }
+
+    void sendAddSuccessCallbacks() {
+        // Start from the head of the queue and proceed while there are
+        // entries that have had all their responses come back
+        PendingAddOp pendingAddOp;
+        while ((pendingAddOp = pendingAddOps.peek()) != null) {
+            if (pendingAddOp.numResponsesPending != 0) {
+                return;
             }
-          }
-        });
+            pendingAddOps.remove();
+            lastAddConfirmed = pendingAddOp.entryId;
+            pendingAddOp.submitCallback(BKException.Code.OK);
+        }
+
+    }
+
+    void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
+        InetSocketAddress newBookie;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Handling failure of bookie: " + addr + " index: "
+                      + bookieIndex);
+        }
+
+        try {
+            newBookie = bk.bookieWatcher
+                        .getAdditionalBookie(metadata.currentEnsemble);
+        } catch (BKNotEnoughBookiesException e) {
+            LOG
+            .error("Could not get additional bookie to remake ensemble, closing ledger: "
+                   + ledgerId);
+            handleUnrecoverableErrorDuringAdd(e.getCode());
+            return;
+        }
+
+        final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(
+            metadata.currentEnsemble);
+        newEnsemble.set(bookieIndex, newBookie);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Changing ensemble from: " + metadata.currentEnsemble + " to: "
+                      + newEnsemble + " for ledger: " + ledgerId + " starting at entry: "
+                      + (lastAddConfirmed + 1));
+        }
+
+        metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
+
+        writeLedgerConfig(new StatCallback() {
+            @Override
+            public void processResult(final int rc, String path, Object ctx, Stat stat) {
+
+                bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        if (rc != KeeperException.Code.OK.intValue()) {
+                            LOG
+                            .error("Could not persist ledger metadata while changing ensemble to: "
+                                   + newEnsemble + " , closing ledger");
+                            handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                            return;
+                        }
 
-      }
-    }, null);
+                        for (PendingAddOp pendingAddOp : pendingAddOps) {
+                            pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
+                        }
+                    }
+                });
 
-  }
+            }
+        }, null);
 
-  void recover(GenericCallback<Void> cb) {
-    if (metadata.isClosed()) {
-      // We are already closed, nothing to do
-      cb.operationComplete(BKException.Code.OK, null);
-      return;
     }
 
-    new LedgerRecoveryOp(this, cb).initiate();
-  }
+    void recover(GenericCallback<Void> cb) {
+        if (metadata.isClosed()) {
+            // We are already closed, nothing to do
+            cb.operationComplete(BKException.Code.OK, null);
+            return;
+        }
 
-  static class NoopCloseCallback implements CloseCallback {
-    static NoopCloseCallback instance = new NoopCloseCallback();
+        new LedgerRecoveryOp(this, cb).initiate();
+    }
 
-    @Override
-    public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-      // noop
+    static class NoopCloseCallback implements CloseCallback {
+        static NoopCloseCallback instance = new NoopCloseCallback();
+
+        @Override
+        public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+            // noop
+        }
+    }
+
+    /**
+     * Implementation of callback interface for synchronous read method.
+     *
+     * @param rc
+     *          return code
+     * @param leder
+     *          ledger identifier
+     * @param seq
+     *          sequence of entries
+     * @param ctx
+     *          control object
+     */
+    public void readComplete(int rc, LedgerHandle lh,
+                             Enumeration<LedgerEntry> seq, Object ctx) {
+
+        SyncCounter counter = (SyncCounter) ctx;
+        synchronized (counter) {
+            counter.setSequence(seq);
+            counter.setrc(rc);
+            counter.dec();
+            counter.notify();
+        }
     }
-  }
 
-  /**
-   * Implementation of callback interface for synchronous read method.
-   * 
-   * @param rc
-   *          return code
-   * @param leder
-   *          ledger identifier
-   * @param seq
-   *          sequence of entries
-   * @param ctx
-   *          control object
-   */
-  public void readComplete(int rc, LedgerHandle lh,
-      Enumeration<LedgerEntry> seq, Object ctx) {
-
-    SyncCounter counter = (SyncCounter) ctx;
-    synchronized (counter) {
-      counter.setSequence(seq);
-      counter.setrc(rc);
-      counter.dec();
-      counter.notify();
-    }
-  }
-
-  /**
-   * Implementation of callback interface for synchronous read method.
-   * 
-   * @param rc
-   *          return code
-   * @param leder
-   *          ledger identifier
-   * @param entry
-   *          entry identifier
-   * @param ctx
-   *          control object
-   */
-  public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
-    SyncCounter counter = (SyncCounter) ctx;
-
-    counter.setrc(rc);
-    counter.dec();
-  }
-
-  
-
-  /**
-   * Implementation of  callback interface for synchronous read last confirmed method.
-   */
-  public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
-      LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
-      
-      synchronized(lcCtx){
-          lcCtx.setRC(rc);
-          lcCtx.setLastConfirmed(lastConfirmed);
-          lcCtx.notify();
-      }
-  }
-  
-  /**
-   * Close callback method
-   * 
-   * @param rc
-   * @param lh
-   * @param ctx
-   */
-  public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-
-    SyncCounter counter = (SyncCounter) ctx;
-    counter.setrc(rc);
-    synchronized (counter) {
-      counter.dec();
-      counter.notify();
+    /**
+     * Implementation of callback interface for synchronous read method.
+     *
+     * @param rc
+     *          return code
+     * @param leder
+     *          ledger identifier
+     * @param entry
+     *          entry identifier
+     * @param ctx
+     *          control object
+     */
+    public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
+        SyncCounter counter = (SyncCounter) ctx;
+
+        counter.setrc(rc);
+        counter.dec();
+    }
+
+
+
+    /**
+     * Implementation of  callback interface for synchronous read last confirmed method.
+     */
+    public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
+        LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
+
+        synchronized(lcCtx) {
+            lcCtx.setRC(rc);
+            lcCtx.setLastConfirmed(lastConfirmed);
+            lcCtx.notify();
+        }
     }
 
-  }
+    /**
+     * Close callback method
+     *
+     * @param rc
+     * @param lh
+     * @param ctx
+     */
+    public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+
+        SyncCounter counter = (SyncCounter) ctx;
+        counter.setrc(rc);
+        synchronized (counter) {
+            counter.dec();
+            counter.notify();
+        }
+
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1165369&r1=1165368&r2=1165369&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Mon Sep  5 17:38:57 2011
@@ -31,7 +31,7 @@ import org.apache.log4j.Logger;
 /**
  * This class encapsulates all the ledger metadata that is persistently stored
  * in zookeeper. It provides parsing and serialization methods of such metadata.
- * 
+ *
  */
 public class LedgerMetadata {
     static final Logger LOG = Logger.getLogger(LedgerMetadata.class);
@@ -53,9 +53,9 @@ public class LedgerMetadata {
     public LedgerMetadata(int ensembleSize, int quorumSize) {
         this.ensembleSize = ensembleSize;
         this.quorumSize = quorumSize;
-        
+
         /*
-         * It is set in PendingReadOp.readEntryComplete, and 
+         * It is set in PendingReadOp.readEntryComplete, and
          * we read it in LedgerRecoveryOp.readComplete.
          */
         this.length = 0;
@@ -67,16 +67,16 @@ public class LedgerMetadata {
     }
 
     /**
-     * Get the Map of bookie ensembles for the various ledger fragments 
+     * Get the Map of bookie ensembles for the various ledger fragments
      * that make up the ledger.
-     * 
-     * @return SortedMap of Ledger Fragments and the corresponding 
+     *
+     * @return SortedMap of Ledger Fragments and the corresponding
      * bookie ensembles that store the entries.
      */
     public SortedMap<Long, ArrayList<InetSocketAddress>> getEnsembles() {
         return ensembles;
     }
-    
+
     boolean isClosed() {
         return close != NOTCLOSED;
     }
@@ -84,7 +84,7 @@ public class LedgerMetadata {
     void close(long entryId) {
         close = entryId;
     }
-    
+
     void addEnsemble(long startEntryId, ArrayList<InetSocketAddress> ensemble) {
         assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
 
@@ -101,7 +101,7 @@ public class LedgerMetadata {
     /**
      * the entry id > the given entry-id at which the next ensemble change takes
      * place ( -1 if no further ensemble changes)
-     * 
+     *
      * @param entryId
      * @return
      */
@@ -117,7 +117,7 @@ public class LedgerMetadata {
 
     /**
      * Generates a byte array based on a LedgerConfig object received.
-     * 
+     *
      * @param config
      *            LedgerConfig object
      * @return byte[]
@@ -133,7 +133,7 @@ public class LedgerMetadata {
                 StringUtils.addrToString(s, addr);
             }
         }
-        
+
         if (close != NOTCLOSED) {
             s.append(lSplitter).append(close).append(tSplitter).append(closed);
         }
@@ -147,7 +147,7 @@ public class LedgerMetadata {
 
     /**
      * Parses a given byte array and transforms into a LedgerConfig object
-     * 
+     *
      * @param array
      *            byte array to parse
      * @return LedgerConfig
@@ -173,8 +173,8 @@ public class LedgerMetadata {
         try {
             lc.quorumSize = new Integer(lines[0]);
             lc.ensembleSize = new Integer(lines[1]);
-            lc.length = new Long(lines[2]); 
-            
+            lc.length = new Long(lines[2]);
+
             for (int i = 3; i < lines.length; i++) {
                 String parts[] = lines[i].split(tSplitter);