You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC

svn commit: r903483 [2/6] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.client;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +21,10 @@
  * 
  */
 
-
 import java.lang.Exception;
 
 /**
- * Implements BookKeeper exceptions. 
+ * Class the enumerates all the possible error conditions
  * 
  */
 
@@ -32,12 +32,18 @@
 public abstract class BKException extends Exception {
 
     private int code;
-    public BKException(int code){
+
+    BKException(int code) {
         this.code = code;
     }
-    
-    public static BKException create(int code){
-        switch(code){
+
+    /**
+     * Create an exception from an error code
+     * @param code return error code
+     * @return correponding exception
+     */
+    public static BKException create(int code) {
+        switch (code) {
         case Code.ReadException:
             return new BKReadException();
         case Code.QuorumException:
@@ -54,11 +60,25 @@
             return new BKNoSuchLedgerExistsException();
         case Code.BookieHandleNotAvailableException:
             return new BKBookieHandleNotAvailableException();
+        case Code.ZKException:
+            return new ZKException();
+        case Code.LedgerRecoveryException:
+            return new BKLedgerRecoveryException();
+        case Code.LedgerClosedException:
+            return new BKLedgerClosedException();
+        case Code.WriteException:
+            return new BKWriteException();
+        case Code.NoSuchEntryException:
+            return new BKNoSuchEntryException();
         default:
             return new BKIllegalOpException();
         }
     }
-    
+
+    /**
+     * List of return codes
+     *
+     */
     public interface Code {
         int OK = 0;
         int ReadException = -1;
@@ -69,20 +89,25 @@
         int NotEnoughBookiesException = -6;
         int NoSuchLedgerExistsException = -7;
         int BookieHandleNotAvailableException = -8;
-        
+        int ZKException = -9;
+        int LedgerRecoveryException = -10;
+        int LedgerClosedException = -11;
+        int WriteException = -12;
+        int NoSuchEntryException = -13;
+
         int IllegalOpException = -100;
     }
-    
-    public void setCode(int code){
+
+    public void setCode(int code) {
         this.code = code;
     }
-    
-    public int getCode(){
+
+    public int getCode() {
         return this.code;
     }
-    
-    public String getMessage(int code){
-        switch(code){
+
+    public static String getMessage(int code) {
+        switch (code) {
         case Code.OK:
             return "No problem";
         case Code.ReadException:
@@ -101,63 +126,102 @@
             return "No such ledger exists";
         case Code.BookieHandleNotAvailableException:
             return "Bookie handle is not available";
+        case Code.ZKException:
+            return "Error while using ZooKeeper";
+        case Code.LedgerRecoveryException:
+            return "Error while recovering ledger";
+        case Code.LedgerClosedException:
+            return "Attempt to write to a closed ledger";
+        case Code.WriteException:
+            return "Write failed on bookie";
+        case Code.NoSuchEntryException:
+            return "No such entry";
         default:
             return "Invalid operation";
         }
     }
-    
+
     public static class BKReadException extends BKException {
-        public BKReadException(){
+        public BKReadException() {
             super(Code.ReadException);
-        }   
+        }
     }
-    
+
+    public static class BKNoSuchEntryException extends BKException {
+        public BKNoSuchEntryException() {
+            super(Code.NoSuchEntryException);
+        }
+    }
+
     public static class BKQuorumException extends BKException {
-        public BKQuorumException(){
+        public BKQuorumException() {
             super(Code.QuorumException);
-        }   
+        }
     }
-     
+
     public static class BKBookieException extends BKException {
-        public BKBookieException(){
+        public BKBookieException() {
             super(Code.NoBookieAvailableException);
-        }   
+        }
     }
-    
+
     public static class BKDigestNotInitializedException extends BKException {
-        public BKDigestNotInitializedException(){
+        public BKDigestNotInitializedException() {
             super(Code.DigestNotInitializedException);
-        }   
+        }
     }
-    
+
     public static class BKDigestMatchException extends BKException {
-        public BKDigestMatchException(){
+        public BKDigestMatchException() {
             super(Code.DigestMatchException);
-        }   
+        }
     }
-    
+
     public static class BKIllegalOpException extends BKException {
-        public BKIllegalOpException(){
+        public BKIllegalOpException() {
             super(Code.IllegalOpException);
-        }   
+        }
     }
-    
+
     public static class BKNotEnoughBookiesException extends BKException {
-        public BKNotEnoughBookiesException(){
+        public BKNotEnoughBookiesException() {
             super(Code.NotEnoughBookiesException);
         }
     }
 
+    public static class BKWriteException extends BKException {
+        public BKWriteException() {
+            super(Code.WriteException);
+        }
+    }
+
     public static class BKNoSuchLedgerExistsException extends BKException {
-        public BKNoSuchLedgerExistsException(){
+        public BKNoSuchLedgerExistsException() {
             super(Code.NoSuchLedgerExistsException);
-        }   
+        }
     }
-    
+
     public static class BKBookieHandleNotAvailableException extends BKException {
-        public BKBookieHandleNotAvailableException(){
+        public BKBookieHandleNotAvailableException() {
             super(Code.BookieHandleNotAvailableException);
-        }   
+        }
+    }
+
+    public static class ZKException extends BKException {
+        public ZKException() {
+            super(Code.ZKException);
+        }
+    }
+
+    public static class BKLedgerRecoveryException extends BKException {
+        public BKLedgerRecoveryException() {
+            super(Code.LedgerRecoveryException);
+        }
+    }
+
+    public static class BKLedgerClosedException extends BKException {
+        public BKLedgerClosedException() {
+            super(Code.LedgerClosedException);
+        }
     }
 }
-    

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
-package org. apache.bookkeeper.client;
+package org.apache.bookkeeper.client;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,597 +21,332 @@
  * 
  */
 
-
 import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Random;
-import java.net.InetSocketAddress;
-
+import java.util.concurrent.Executors;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.LedgerManagementProcessor.CreateLedgerOp;
-import org.apache.bookkeeper.client.LedgerManagementProcessor.OpenLedgerOp;
+import org.apache.bookkeeper.client.SyncCounter;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.log4j.Logger;
 
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.WatchedEvent;
-
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 /**
- * BookKeeper client. We assume there is one single writer 
- * to a ledger at any time. 
+ * BookKeeper client. We assume there is one single writer to a ledger at any
+ * time.
  * 
- * There are three possible operations: start a new ledger, 
- * write to a ledger, and read from a ledger.
+ * There are three possible operations: start a new ledger, write to a ledger,
+ * and read from a ledger.
+ * 
+ * The exceptions resulting from synchronous calls and error code resulting from
+ * asynchronous calls can be found in the class {@link BKException}.
  * 
- * For the ZooKeeper layout, please refer to BKDefs.java.
  * 
  */
 
-public class BookKeeper 
-implements Watcher {
- 
-    Logger LOG = Logger.getLogger(BookKeeper.class);
-    
-    ZooKeeper zk = null;
-    
+public class BookKeeper implements OpenCallback, CreateCallback {
+
+  static final Logger LOG = Logger.getLogger(BookKeeper.class);
+
+  ZooKeeper zk = null;
+  // whether the zk handle is one we created, or is owned by whoever
+  // instantiated us
+  boolean ownZKHandle = false;
+
+  ClientSocketChannelFactory channelFactory;
+  // whether the socket factory is one we created, or is owned by whoever
+  // instantiated us
+  boolean ownChannelFactory = false;
+
+  BookieClient bookieClient;
+  BookieWatcher bookieWatcher;
+
+  OrderedSafeExecutor callbackWorker = new OrderedSafeExecutor(Runtime
+      .getRuntime().availableProcessors());
+  OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
+      .getRuntime().availableProcessors());
+
+  /**
+   * Create a bookkeeper client. A zookeeper client and a client socket factory
+   * will be instantiated as part of this constructor.
+   * 
+   * @param servers
+   *          A list of one of more servers on which zookeeper is running. The
+   *          client assumes that the running bookies have been registered with
+   *          zookeeper under the path
+   *          {@link BookieWatcher#BOOKIE_REGISTRATION_PATH}
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public BookKeeper(String servers) throws IOException, InterruptedException,
+      KeeperException {
+    this(new ZooKeeper(servers, 10000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // TODO: handle session disconnects and expires
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Process: " + event.getType() + " " + event.getPath());
+        }
+      }
+    }), new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+        Executors.newCachedThreadPool()));
+
+    ownZKHandle = true;
+    ownChannelFactory = true;
+  }
+
+  /**
+   * Create a bookkeeper client but use the passed in zookeeper client instead
+   * of instantiating one.
+   * 
+   * @param zk
+   *          Zookeeper client instance connected to the zookeeper with which
+   *          the bookies have registered
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public BookKeeper(ZooKeeper zk) throws InterruptedException, KeeperException {
+    this(zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+        Executors.newCachedThreadPool()));
+    ownChannelFactory = true;
+  }
+
+  /**
+   * Create a bookkeeper client but use the passed in zookeeper client and
+   * client socket channel factory instead of instantiating those.
+   * 
+   * @param zk
+   *          Zookeeper client instance connected to the zookeeper with which
+   *          the bookies have registered
+   * @param channelFactory
+   *          A factory that will be used to create connections to the bookies
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public BookKeeper(ZooKeeper zk, ClientSocketChannelFactory channelFactory)
+      throws InterruptedException, KeeperException {
+    if (zk == null || channelFactory == null) {
+      throw new NullPointerException();
+    }
+    this.zk = zk;
+    this.channelFactory = channelFactory;
+    bookieWatcher = new BookieWatcher(this);
+    bookieWatcher.readBookiesBlocking();
+    bookieClient = new BookieClient(channelFactory, mainWorkerPool);
+  }
+
+  /**
+   * There are 2 digest types that can be used for verification. The CRC32 is
+   * cheap to compute but does not protect against byzantine bookies (i.e., a
+   * bookie might report fake bytes and a matching CRC32). The MAC code is more
+   * expensive to compute, but is protected by a password, i.e., a bookie can't
+   * report fake bytes with a mathching MAC unless it knows the password
+   */
+  public enum DigestType {
+    MAC, CRC32
+  };
+
+  public ZooKeeper getZkHandle() {
+    return zk;
+  }
+
+  /**
+   * Creates a new ledger asynchronously. To create a ledger, we need to specify
+   * the ensemble size, the quorum size, the digest type, a password, a callback
+   * implementation, and an optional control object. The ensemble size is how
+   * many bookies the entries should be striped among and the quorum size is the
+   * degree of replication of each entry. The digest type is either a MAC or a
+   * CRC. Note that the CRC option is not able to protect a client against a
+   * bookie that replaces an entry. The password is used not only to
+   * authenticate access to a ledger, but also to verify entries in ledgers.
+   * 
+   * @param ensSize
+   *          ensemble size
+   * @param qSize
+   *          quorum size
+   * @param digestType
+   *          digest type, either MAC or CRC32
+   * @param passwd
+   *          password
+   * @param cb
+   *          createCallback implementation
+   * @param ctx
+   *          optional control object
+   */
+  public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType,
+      byte[] passwd, CreateCallback cb, Object ctx) {
+
+    new LedgerCreateOp(this, ensSize, qSize, digestType, passwd, cb, ctx)
+        .initiate();
+
+  }
+
+  /**
+   * Create callback implementation for synchronous create call.
+   * 
+   * @param rc
+   *          return code
+   * @param lh
+   *          ledger handle object
+   * @param ctx
+   *          optional control object
+   */
+  public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+    SyncCounter counter = (SyncCounter) ctx;
+    counter.setLh(lh);
+    counter.setrc(rc);
+    counter.dec();
+  }
+
+  /**
+   * Creates a new ledger. Default of 3 servers, and quorum of 2 servers.
+   * 
+   * @param digestType
+   *          digest type, either MAC or CRC32
+   * @param passwd
+   *          password
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws BKException
+   */
+  public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+      throws KeeperException, BKException, InterruptedException, IOException {
+    return createLedger(3, 2, digestType, passwd);
+  }
+
+  /**
+   * Synchronous call to create ledger. Parameters match those of
+   * {@link #asyncCreateLedger(int, int, DigestType, byte[], CreateCallback, Object)}
+   * 
+   * @param ensSize
+   * @param qSize
+   * @param digestType
+   * @param passwd
+   * @return
+   * @throws KeeperException
+   * @throws InterruptedException
+   * @throws IOException
+   * @throws BKException
+   */
+  public LedgerHandle createLedger(int ensSize, int qSize,
+      DigestType digestType, byte passwd[]) throws KeeperException,
+      InterruptedException, IOException, BKException {
+    SyncCounter counter = new SyncCounter();
+    counter.inc();
     /*
-     * The ledgerMngProcessor is a thread that processes
-     * asynchronously requests that handle ledgers, such
-     * as create, open, and close.
+     * Calls asynchronous version
      */
-    private static LedgerManagementProcessor ledgerMngProcessor;
-    
+    asyncCreateLedger(ensSize, qSize, digestType, passwd, this, counter);
+
     /*
-     * Blacklist of bookies
-     */
-    HashSet<InetSocketAddress> bookieBlackList;
-    
-    LedgerSequence responseRead;
-    Long responseLong;
-    
-    public BookKeeper(String servers) 
-    throws KeeperException, IOException{
-    	LOG.debug("Creating BookKeeper for servers " + servers);
-        //Create ZooKeeper object
-        this.zk = new ZooKeeper(servers, 10000, this);
-        
-        //List to enable clients to blacklist bookies
-        this.bookieBlackList = new HashSet<InetSocketAddress>();
-    }
-    
-    /**
-     * Watcher method. 
-     */
-    synchronized public void process(WatchedEvent event) {
-        LOG.debug("Process: " + event.getType() + " " + event.getPath());
-    }
-    
-    /**
-     * Formats ledger ID according to ZooKeeper rules
-     * 
-     * @param id	znode id
-     */
-    String getZKStringId(long id){
-        return String.format("%010d", id);        
-    }
-    
-    /**
-     * return the zookeeper instance
-     * @return return the zookeeper instance
+     * Wait
      */
-    ZooKeeper getZooKeeper() {
-        return zk;
-    }
-    
-    LedgerManagementProcessor getMngProcessor(){
-        if (ledgerMngProcessor == null){
-            ledgerMngProcessor = new LedgerManagementProcessor(this);
-            ledgerMngProcessor.start();
-        }
-        return ledgerMngProcessor;
-    }
-    
-    /**
-     * Creates a new ledger. To create a ledger, we need to specify the ensemble
-     * size, the quorum size, the operation mode, and a password. The ensemble size
-     * and the quorum size depend upon the operation mode. The operation mode can be
-     * GENERIC, VERIFIABLE, or FREEFORM (debugging). The password is used not only
-     * to authenticate access to a ledger, but also to verify entries in verifiable
-     * ledgers.
-     * 
-     * @param ensSize   ensemble size
-     * @param qSize     quorum size
-     * @param mode      quorum mode: VERIFIABLE (default), GENERIC, or FREEFORM
-     * @param passwd    password
-     */
-    public LedgerHandle createLedger(int ensSize, int qSize, QMode mode,  byte passwd[])
-        throws KeeperException, InterruptedException, 
-        IOException, BKException {
-        // Check that quorum size follows the minimum
-        long t;
-        LedgerHandle lh = null;
-        
-        switch(mode){
-        case VERIFIABLE:
-            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
-            if(t == 0){
-                LOG.error("Tolerates 0 bookie failures"); 
-                throw BKException.create(Code.QuorumException);
-            }
-            break;
-        case GENERIC:
-            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/3));
-            if(t == 0){
-                LOG.error("Tolerates 0 bookie failures"); 
-                throw BKException.create(Code.QuorumException);
-            }
-            break;
-        case FREEFORM:
-            break;
-        }
-        /*
-         * Create ledger node on ZK.
-         * We get the id from the sequence number on the node.
-         */
-        String path = zk.create(BKDefs.prefix, new byte[0], 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
-        /* 
-         * Extract ledger id.
-         */
-        String parts[] = path.split("/");
-        String subparts[] = parts[2].split("L");
-        try{
-            long lId = Long.parseLong(subparts[1]);
-       
-            /* 
-             * Get children from "/ledgers/available" on zk
-             */
-            List<String> list = 
-                zk.getChildren("/ledgers/available", false);
-            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-            /* 
-             * Select ensSize servers to form the ensemble
-             */
-            path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-         
-            /* 
-             * Add quorum size to ZK metadata
-             */
-            ByteBuffer bb = ByteBuffer.allocate(4);
-            bb.putInt(qSize);
-            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            /* 
-             * Quorum mode
-             */
-            bb = ByteBuffer.allocate(4);
-            bb.putInt(mode.ordinal());
-            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            /* 
-             * Create QuorumEngine
-             */
-            lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
-            
-            /*
-             * Adding bookies to ledger handle
-             */
-            Random r = new Random();
-        
-            for(int i = 0; i < ensSize; i++){
-                int index = 0;
-                if(list.size() > 1) 
-                    index = r.nextInt(list.size() - 1);
-                else if(list.size() == 1)
-                    index = 0;
-                else {
-                    LOG.error("Not enough bookies available");
-        	    
-                    return null;
-                }
-            
-                try{
-                    String bookie = list.remove(index);
-                    LOG.info("Bookie: " + bookie);
-                    InetSocketAddress tAddr = parseAddr(bookie);
-                    int bindex = lh.addBookieForWriting(tAddr); 
-                    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
-                    bindexBuf.putInt(bindex);
-        	    
-                    String pBookie = "/" + bookie;
-                    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
-                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                } catch (IOException e) {
-                    LOG.error(e);
-                    i--;
-                } 
-            }
-            LOG.debug("Created new ledger");
-        } catch (NumberFormatException e) {
-            LOG.error("Error when parsing the ledger identifier", e);
-        }
-        // Return ledger handler
-        return lh; 
-    }
+    counter.block(0);
+    if (counter.getLh() == null) {
+      LOG.error("ZooKeeper error: " + counter.getrc());
+      throw BKException.create(Code.ZKException);
+    }
+
+    return counter.getLh();
+  }
+
+  /**
+   * Open existing ledger asynchronously for reading.
+   * 
+   * @param lId
+   *          ledger identifier
+   * @param digestType
+   *          digest type, either MAC or CRC32
+   * @param passwd
+   *          password
+   * @param ctx
+   *          optional control object
+   */
+  public void asyncOpenLedger(long lId, DigestType digestType, byte passwd[],
+      OpenCallback cb, Object ctx) {
+
+    new LedgerOpenOp(this, lId, digestType, passwd, cb, ctx).initiate();
+
+  }
+
+  /**
+   * Callback method for synchronous open operation
+   * 
+   * @param rc
+   *          return code
+   * @param lh
+   *          ledger handle
+   * @param ctx
+   *          optional control object
+   */
+  public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+    SyncCounter counter = (SyncCounter) ctx;
+    counter.setLh(lh);
+
+    LOG.debug("Open complete: " + rc);
+
+    counter.setrc(rc);
+    counter.dec();
+  }
+
+  /**
+   * Synchronous open ledger call
+   * 
+   * @param lId
+   *          ledger identifier
+   * @param digestType
+   *          digest type, either MAC or CRC32
+   * @param passwd
+   *          password
+   * @return
+   * @throws InterruptedException
+   * @throws BKException
+   */
+
+  public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[])
+      throws BKException, InterruptedException {
+    SyncCounter counter = new SyncCounter();
+    counter.inc();
 
-    /**
-     * Creates a new ledger. Default of 3 servers, and quorum of 2 servers,
-     * verifiable ledger.
-     * 
-     * @param passwd	password
+    /*
+     * Calls async open ledger
      */
-    public LedgerHandle createLedger(byte passwd[])
-    throws KeeperException, BKException, 
-    InterruptedException, IOException {
-        return createLedger(3, 2, QMode.VERIFIABLE, passwd);
-    }
+    asyncOpenLedger(lId, digestType, passwd, this, counter);
 
-    /**
-     * Asychronous call to create ledger
-     * 
-     * @param ensSize
-     * @param qSize
-     * @param mode
-     * @param passwd
-     * @param cb
-     * @param ctx
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws IOException
-     * @throws BKException
-     */
-    public void asyncCreateLedger(int ensSize, 
-            int qSize, 
-            QMode mode,  
-            byte passwd[],
-            CreateCallback cb,
-            Object ctx
-            )
-    throws KeeperException, InterruptedException, 
-    IOException, BKException {
-        CreateLedgerOp op = new CreateLedgerOp(ensSize, 
-                qSize, 
-                mode, 
-                passwd, 
-                cb, 
-                ctx);
-        LedgerManagementProcessor lmp = getMngProcessor();
-        lmp.addOp(op);
-        
-    }
-    
-    /**
-     * Open existing ledger for reading. Default for quorum size is 2.
-     * 
-     * @param long  the long corresponding to the ledger id
-     * @param byte[]    byte array corresponding to the password to access a ledger
-     * @param int   the quorum size, it has to be at least ceil(n+1/2)
-     */
-    public LedgerHandle openLedger(long lId, byte passwd[])
-    throws KeeperException, InterruptedException, IOException, BKException {
-        
-        Stat stat = null;
-        
-        /*
-         * Check if ledger exists
-         */
-        if(zk.exists(BKDefs.prefix + getZKStringId(lId), false) == null){
-            LOG.error("Ledger " + getZKStringId(lId) + " doesn't exist.");
-            throw BKException.create(Code.NoSuchLedgerExistsException);
-        }
-        
-        /*
-         * Get quorum size.
-         */
-        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
-        int qSize = bb.getInt();
-         
-        /*
-         * Get last entry written from ZK 
-         */
-        
-        long last = 0;
-        LOG.debug("Close path: " + BKDefs.prefix + getZKStringId(lId) + BKDefs.close);
-        if(zk.exists(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false) == null){
-            recoverLedger(lId, passwd);
-        }
-            
-        stat = null;
-        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false, stat);
-        ByteBuffer buf = ByteBuffer.wrap(data);
-        last = buf.getLong();
-        //zk.delete(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, -1);
-        
-        /*
-         * Quorum mode 
-         */
-        data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
-        buf = ByteBuffer.wrap(data);
-        
-        QMode qMode;
-        switch(buf.getInt()){
-        case 1:
-            qMode = QMode.GENERIC;
-            LOG.info("Generic ledger");
-            break;
-        case 2:
-            qMode = QMode.FREEFORM;
-            break;
-        default:
-            qMode = QMode.VERIFIABLE;
-            LOG.info("Verifiable ledger");
-        }
-        
-        /*
-         *  Create QuorumEngine
-         */
-        LedgerHandle lh = new LedgerHandle(this, lId, last, qSize, qMode, passwd);
-        
-        /*
-         * Get children of "/ledgers/id/ensemble" 
-         */
-        
-        List<String> list = 
-            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
-        
-        LOG.debug("Length of list of bookies: " + list.size());
-        for(int i = 0 ; i < list.size() ; i++){
-            for(String s : list){
-                LOG.debug("Extracting bookie: " + s);
-                byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
-                ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
-                if(bindexBuf.getInt() == i){                      
-                    try{
-                        lh.addBookieForReading(parseAddr(s));
-                    } catch (IOException e){
-                        LOG.error(e);
-                    }
-                }
-            }
-        }
-        
-        /*
-         * Read changes to quorum over time. To determine if there has been changes during
-         * writes to the ledger, check if there is a znode called quorumEvolution.
-         */
-        if(zk.exists(BKDefs.prefix + 
-                getZKStringId(lh.getId()) +  
-                BKDefs.quorumEvolution, false) != null){
-                    String path = BKDefs.prefix + 
-                    getZKStringId(lh.getId()) +  
-                    BKDefs.quorumEvolution;
-                    
-                    List<String> faultList = zk.getChildren(path, false);
-                    try{
-                        for(String s : faultList){
-                            LOG.debug("Faulty list child: " + s);
-                            long entry = Long.parseLong(s);
-                            String addresses = new String(zk.getData(path + "/" + s, false, stat));
-                            String parts[] = addresses.split(" ");
-
-                            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
-                            for(int i = 0 ; i < parts.length ; i++){
-                                LOG.debug("Address: " + parts[i]);
-                                InetSocketAddress faultyBookie =  
-                                    parseAddr(parts[i].substring(1));                           
-                        
-                                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
-                            }
-                            lh.setNewBookieConfig(entry, newBookieSet);
-                            LOG.debug("NewBookieSet size: " + newBookieSet.size());
-                        }
-
-                        lh.prepareEntryChange();
-                    } catch (NumberFormatException e) {
-                        LOG.error("Error when parsing the ledger identifier", e);
-                    }
-                }
-      
-        /*
-         *  Return ledger handler
-         */
-        return lh;
-    }    
-    
-    public void asyncOpenLedger(long lId, byte passwd[], OpenCallback cb, Object ctx)
-    throws InterruptedException{
-        OpenLedgerOp op = new OpenLedgerOp(lId, 
-                passwd,  
-                cb, 
-                ctx);
-        LedgerManagementProcessor lmp = getMngProcessor();
-        lmp.addOp(op);
-    }
-    
-    /**
-     * Parses address into IP and port.
-     * 
-     *  @param addr	String
-     */
-    
-    InetSocketAddress parseAddr(String s){
-        String parts[] = s.split(":");
-        if (parts.length != 2) {
-            System.out.println(s
-                    + " does not have the form host:port");
-        }
-        InetSocketAddress addr = new InetSocketAddress(parts[0],
-                Integer.parseInt(parts[1]));
-        return addr;
-    }
-    
- 
-    /**
-     * Check if close node exists. 
-     * 
-     * @param ledgerId	id of the ledger to check
-     */
-    public boolean hasClosed(long ledgerId)
-    throws KeeperException, InterruptedException{
-        String closePath = BKDefs.prefix + getZKStringId(ledgerId) + BKDefs.close;
-        if(zk.exists(closePath, false) == null) return false;
-        else return true;
-    }
-    
-    /**
-     * Recover a ledger that was not closed properly.
-     * 
-     * @param lId	ledger identifier
-     * @param passwd	password
-     */
-    
-    boolean recoverLedger(long lId, byte passwd[])
-    throws KeeperException, InterruptedException, IOException, BKException {
-        
-        Stat stat = null;
-       
-        LOG.info("Recovering ledger");
-        
-        /*
-         * Get quorum size.
-         */
-        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
-        int qSize = bb.getInt();
-                
-        
-        /*
-         * Get children of "/ledgers/id/ensemble" 
-         */
-        
-        List<String> list = 
-            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
-        
-        ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
-        for(String s : list){
-            addresses.add(parseAddr(s));
-        }
-        
-        /*
-         * Quorum mode 
-         */
-        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
-        ByteBuffer buf = ByteBuffer.wrap(data);
-        //int ordinal = buf.getInt();
-            
-        QMode qMode = QMode.VERIFIABLE;
-        switch(buf.getInt()){
-        case 0:
-            qMode = QMode.VERIFIABLE;
-            break;
-        case 1:
-            qMode = QMode.GENERIC;
-            break;
-        case 2:
-            qMode = QMode.FREEFORM;
-            break;
-        }
-        
-        /*
-         * Create ledger recovery monitor object
-         */
-        
-        LedgerRecoveryMonitor lrm = new LedgerRecoveryMonitor(this, lId, qSize, addresses, qMode);
-        
-        return lrm.recover(passwd);
-    }
-    
-    /**
-     * Get new bookies
-     * 
-     * @param addrList	list of bookies to replace
-     */
-    InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
-    throws InterruptedException {
-        try{
-            // Get children from "/ledgers/available" on zk
-            List<String> list = 
-                zk.getChildren("/ledgers/available", false);
-            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-    
-            for(String addr : list){
-                InetSocketAddress nAddr = parseAddr(addr); 
-                if(!addrList.contains(nAddr) &&
-                        !bookieBlackList.contains(nAddr))
-                    return nAddr;
-            }
-        } catch (KeeperException e){
-            LOG.error("Problem accessing ZooKeeper: " + e);
-        }
-        
-        return null;
-    }
-    
-    HashMap<InetSocketAddress, BookieHandle> bhMap = 
-    	new HashMap<InetSocketAddress, BookieHandle>();
-    
-    /**
-     *  Keeps a list of available BookieHandle objects and returns
-     *  the corresponding object given an address.
-     *  
-     *  @param	a	InetSocketAddress
-     */
-    
-    synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
-    throws ConnectException, IOException {
-    	if(!bhMap.containsKey(a)){
-    	    BookieHandle bh = new BookieHandle(a, true); 
-    		bhMap.put(a, bh);
-    		bh.start();
-    	}
-    	bhMap.get(a).incRefCount(lh);
-    	
-    	return bhMap.get(a);
-    }
-    
-    /**
-     * When there are no more references to a BookieHandle,
-     * remove it from the list. 
-     */
-    
-    synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
-        while(bookies.size() > 0){
-            BookieHandle bh = bookies.remove(0);
-            if(bh.halt(lh) <= 0)
-                bhMap.remove(bh.addr);
-        }
-    }
-    
-    /**
-     * Blacklists bookies.
-     * 
-     * @param addr 	address of bookie
-     */
-    void blackListBookie(InetSocketAddress addr){
-        bookieBlackList.add(addr);
-    }
-    
-    /**
-     * Halts all bookie handles
-     * 
+    /*
+     * Wait
      */
-    public void halt() throws InterruptedException{
-        
-        for(BookieHandle bh: bhMap.values()){
-            bh.shutdown();
-        }
-        zk.close();
-    }
+    counter.block(0);
+    if (counter.getrc() != BKException.Code.OK)
+      throw BKException.create(counter.getrc());
+
+    return counter.getLh();
+  }
+
+  /**
+   * Shuts down client.
+   * 
+   */
+  public void halt() throws InterruptedException {
+    bookieClient.close();
+    bookieWatcher.halt();
+    if (ownChannelFactory) {
+      channelFactory.releaseExternalResources();
+    }
+    if (ownZKHandle) {
+      zk.close();
+    }
+    callbackWorker.shutdown();
+    mainWorkerPool.shutdown();
+  }
 }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,204 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * This class is responsible for maintaining a consistent view of what bookies
+ * are available by reading Zookeeper (and setting watches on the bookie nodes).
+ * When a bookie fails, the other parts of the code turn to this class to find a
+ * replacement
+ * 
+ */
+class BookieWatcher implements Watcher, ChildrenCallback {
+    static final Logger logger = Logger.getLogger(BookieWatcher.class);
+    
+    public static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available";
+    static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+    public static int ZK_CONNECT_BACKOFF_SEC = 1;
+
+    BookKeeper bk;
+    ScheduledExecutorService scheduler;
+
+    Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+
+    SafeRunnable reReadTask = new SafeRunnable() {
+        @Override
+        public void safeRun() {
+            readBookies();
+        }
+    };
+
+    public BookieWatcher(BookKeeper bk) {
+        this.bk = bk;
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+    }
+    
+    public void halt(){
+        scheduler.shutdown();
+    }
+
+    public void readBookies() {
+        readBookies(this);
+    }
+
+    public void readBookies(ChildrenCallback callback) {
+        bk.getZkHandle().getChildren( BOOKIE_REGISTRATION_PATH, this, callback, null);
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        readBookies();
+    }
+
+    @Override
+    public void processResult(int rc, String path, Object ctx, List<String> children) {
+
+        if (rc != KeeperException.Code.OK.intValue()) {
+            //logger.error("Error while reading bookies", KeeperException.create(Code.get(rc), path));
+            // try the read after a second again
+            scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
+            return;
+        }
+
+        // Read the bookie addresses into a set for efficient lookup
+        Set<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
+        for (String bookieAddrString : children) {
+            InetSocketAddress bookieAddr;
+            try {
+                bookieAddr = StringUtils.parseAddr(bookieAddrString);
+            } catch (IOException e) {
+                logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
+                continue;
+            }
+            newBookieAddrs.add(bookieAddr);
+        }
+
+        synchronized (this) {
+            knownBookies = newBookieAddrs;
+        }
+    }
+
+    /**
+     * Blocks until bookies are read from zookeeper, used in the {@link BookKeeper} constructor.
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    public void readBookiesBlocking() throws InterruptedException, KeeperException {
+        final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
+        readBookies(new ChildrenCallback() {
+            public void processResult(int rc, String path, Object ctx, List<String> children) {
+                try {
+                    BookieWatcher.this.processResult(rc, path, ctx, children);
+                    queue.put(rc);
+                } catch (InterruptedException e) {
+                    logger.error("Interruped when trying to read bookies in a blocking fashion");
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        int rc = queue.take();
+
+        if (rc != KeeperException.Code.OK.intValue()) {
+            throw KeeperException.create(Code.get(rc));
+        }
+    }
+
+    /**
+     * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when there is no exclusion list (or exisiting bookies)
+     * @param numBookiesNeeded
+     * @return
+     * @throws BKNotEnoughBookiesException
+     */
+    public ArrayList<InetSocketAddress> getNewBookies(int numBookiesNeeded) throws BKNotEnoughBookiesException {
+        return getAdditionalBookies(EMPTY_SET, numBookiesNeeded);
+    }
+
+    /**
+     * Wrapper over the {@link #getAdditionalBookies(Set, int)} method when you just need 1 extra bookie
+     * @param existingBookies
+     * @return
+     * @throws BKNotEnoughBookiesException
+     */
+    public InetSocketAddress getAdditionalBookie(List<InetSocketAddress> existingBookies)
+            throws BKNotEnoughBookiesException {
+        return getAdditionalBookies(new HashSet<InetSocketAddress>(existingBookies), 1).get(0);
+    }
+
+    /**
+     * Returns additional bookies given an exclusion list and how many are needed
+     * @param existingBookies
+     * @param numAdditionalBookiesNeeded
+     * @return
+     * @throws BKNotEnoughBookiesException
+     */
+    public ArrayList<InetSocketAddress> getAdditionalBookies(Set<InetSocketAddress> existingBookies,
+            int numAdditionalBookiesNeeded) throws BKNotEnoughBookiesException {
+
+        ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>();
+
+        if (numAdditionalBookiesNeeded <= 0) {
+            return newBookies;
+        }
+
+        List<InetSocketAddress> allBookies;
+
+        synchronized (this) {
+            allBookies = new ArrayList<InetSocketAddress>(knownBookies);
+        }
+
+        Collections.shuffle(allBookies);
+
+        for (InetSocketAddress bookie : allBookies) {
+            if (existingBookies.contains(bookie)) {
+                continue;
+            }
+
+            newBookies.add(bookie);
+            numAdditionalBookiesNeeded--;
+
+            if (numAdditionalBookiesNeeded == 0) {
+                return newBookies;
+            }
+        }
+
+        throw new BKNotEnoughBookiesException();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,50 @@
+package org.apache.bookkeeper.client;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+
+import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
+
+class CRC32DigestManager extends DigestManager {
+    CRC32 crc = new CRC32();
+    
+    public CRC32DigestManager(long ledgerId) {
+        super(ledgerId);
+    }
+
+    @Override
+    int getMacCodeLength() {
+        return 8;
+    }
+    
+    @Override
+    byte[] getValueAndReset() {
+        byte[] value = new byte[8];
+        ByteBuffer buf = ByteBuffer.wrap(value);
+        buf.putLong(crc.getValue());
+        crc.reset();
+        return value;
+    }
+    
+    @Override
+    void update(byte[] data, int offset, int length) {
+        crc.update(data, offset, length);
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,162 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+/**
+ * This class takes an entry, attaches a digest to it and packages it with relevant
+ * data so that it can be shipped to the bookie. On the return side, it also
+ * gets a packet, checks that the digest matches, and extracts the original entry
+ * for the packet. Currently 2 types of digests are supported: MAC (based on SHA-1) and CRC32
+ */
+
+abstract class DigestManager {
+    static final Logger logger = Logger.getLogger(DigestManager.class);
+
+    long ledgerId;
+    
+    abstract int getMacCodeLength();
+    
+    void update(byte[] data){
+        update(data, 0, data.length);
+    }
+    
+    abstract void update(byte[] data, int offset, int length);
+    abstract byte[] getValueAndReset();
+    
+    final int macCodeLength;
+
+    public DigestManager(long ledgerId) {
+        this.ledgerId = ledgerId;
+        macCodeLength = getMacCodeLength();
+    }
+    
+    static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType) throws GeneralSecurityException{
+        switch(digestType){
+        case MAC:
+            return new MacDigestManager(ledgerId, passwd);
+        case CRC32:
+            return new CRC32DigestManager(ledgerId);
+        default:
+            throw new GeneralSecurityException("Unknown checksum type: " + digestType);
+        }
+    }
+
+    ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
+
+        byte[] bufferArray = new byte[24+macCodeLength];
+        ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
+        buffer.putLong(ledgerId);
+        buffer.putLong(entryId);
+        buffer.putLong(lastAddConfirmed);
+        buffer.flip();
+
+        update(buffer.array(), 0, 24);
+        update(data);
+        byte[] digest = getValueAndReset();
+
+        buffer.limit(buffer.capacity());
+        buffer.position(24);
+        buffer.put(digest);
+        buffer.flip();
+
+        return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data));
+    }
+
+    private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {
+        verifyDigest(-1, dataReceived, true);
+    }
+
+    private void verifyDigest(long entryId, ChannelBuffer dataReceived) throws BKDigestMatchException {
+        verifyDigest(entryId, dataReceived, false);
+    }
+
+    private void verifyDigest(long entryId, ChannelBuffer dataReceived, boolean skipEntryIdCheck)
+            throws BKDigestMatchException {
+
+        ByteBuffer dataReceivedBuffer = dataReceived.toByteBuffer();
+        byte[] digest;
+
+        update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), 24);
+
+        int offset = 24 + macCodeLength;
+        update(dataReceivedBuffer.array(), dataReceivedBuffer.position() + offset, dataReceived.readableBytes() - offset);
+        digest = getValueAndReset();
+
+        for (int i = 0; i < digest.length; i++) {
+            if (digest[i] != dataReceived.getByte(24 + i)) {
+                logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
+                throw new BKDigestMatchException();
+            }
+        }
+
+        long actualLedgerId = dataReceived.readLong();
+        long actualEntryId = dataReceived.readLong();
+
+        if (actualLedgerId != ledgerId) {
+            logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+                    + actualLedgerId);
+            throw new BKDigestMatchException();
+        }
+
+        if (!skipEntryIdCheck && actualEntryId != entryId) {
+            logger.error("Entry-id mismatch in authenticated message, expected: " + entryId + " , actual: "
+                    + actualEntryId);
+            throw new BKDigestMatchException();
+        }
+
+    }
+
+    ChannelBufferInputStream verifyDigestAndReturnData(long entryId, ChannelBuffer dataReceived)
+            throws BKDigestMatchException {
+        verifyDigest(entryId, dataReceived);
+        dataReceived.readerIndex(24 + macCodeLength);
+        return new ChannelBufferInputStream(dataReceived);
+    }
+
+    static class RecoveryData {
+        long lastAddConfirmed;
+        long entryId;
+
+        public RecoveryData(long lastAddConfirmed, long entryId) {
+            this.lastAddConfirmed = lastAddConfirmed;
+            this.entryId = entryId;
+        }
+
+    }
+
+    RecoveryData verifyDigestAndReturnLastConfirmed(ChannelBuffer dataReceived) throws BKDigestMatchException {
+        verifyDigest(dataReceived);
+        dataReceived.readerIndex(8);
+
+        long entryId = dataReceived.readLong();
+        long lastAddConfirmed = dataReceived.readLong();
+        return new RecoveryData(lastAddConfirmed, entryId);
+
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,61 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This interface determins how entries are distributed among bookies.
+ * 
+ * Every entry gets replicated to some number of replicas. The first replica for
+ * an entry is given a replicaIndex of 0, and so on. To distribute write load,
+ * not all entries go to all bookies. Given an entry-id and replica index, an
+ * {@link DistributionSchedule} determines which bookie that replica should go
+ * to.
+ */
+
+interface DistributionSchedule {
+
+    /**
+     * 
+     * @param entryId
+     * @param replicaIndex
+     * @return index of bookie that should get this replica
+     */
+    public int getBookieIndex(long entryId, int replicaIndex);
+
+    /**
+     * 
+     * @param entryId
+     * @param bookieIndex
+     * @return -1 if the given bookie index is not a replica for the given
+     *         entryId
+     */
+    public int getReplicaIndex(long entryId, int bookieIndex);
+
+    /**
+     * Specifies whether its ok to proceed with recovery given that we have
+     * heard back from the given bookie index. These calls will be a made in a
+     * sequence and an implementation of this interface should accumulate
+     * history about which bookie indexes we have heard from. Once this method
+     * has returned true, it wont be called again on the same instance
+     * 
+     * @param bookieIndexHeardFrom
+     * @return true if its ok to proceed with recovery
+     */
+    public boolean canProceedWithRecovery(int bookieIndexHeardFrom);
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,163 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Encapsulates asynchronous ledger create operation
+ * 
+ */
+class LedgerCreateOp implements StringCallback, StatCallback {
+
+    static final Logger LOG = Logger.getLogger(LedgerCreateOp.class);
+
+    CreateCallback cb;
+    LedgerMetadata metadata;
+    LedgerHandle lh;
+    Object ctx;
+    byte[] passwd;
+    BookKeeper bk;
+    DigestType digestType;
+
+   /**
+    * Constructor
+    * 
+    * @param bk
+    *       BookKeeper object
+    * @param ensembleSize
+    *       ensemble size
+    * @param quorumSize
+    *       quorum size
+    * @param digestType
+    *       digest type, either MAC or CRC32
+    * @param passwd
+    *       passowrd
+    * @param cb
+    *       callback implementation
+    * @param ctx
+    *       optional control object
+    */
+
+    LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
+        this.bk = bk;
+        this.metadata = new LedgerMetadata(ensembleSize, quorumSize);
+        this.digestType = digestType;
+        this.passwd = passwd;
+        this.cb = cb;
+        this.ctx = ctx;
+    }
+
+    /**
+     * Initiates the operation
+     */
+    public void initiate() {
+        /*
+         * Create ledger node on ZK. We get the id from the sequence number on
+         * the node.
+         */
+
+        bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL, this, null);
+
+        // calls the children callback method below
+    }
+
+
+    /**
+     * Implements ZooKeeper string callback.
+     * 
+     * @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
+     */
+    public void processResult(int rc, String path, Object ctx, String name) {
+
+        if (rc != KeeperException.Code.OK.intValue()) {
+            LOG.error("Could not create node for ledger", KeeperException.create(KeeperException.Code.get(rc), path));
+            cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+            return;
+        }
+
+        /*
+         * Extract ledger id.
+         */
+        long ledgerId;
+        try {
+            ledgerId = StringUtils.getLedgerId(name);
+        } catch (IOException e) {
+            LOG.error("Could not extract ledger-id from path:" + path, e);
+            cb.createComplete(BKException.Code.ZKException, null, this.ctx);
+            return;
+        }
+
+        /*
+         * Adding bookies to ledger handle
+         */
+
+        ArrayList<InetSocketAddress> ensemble;
+        try {
+            ensemble = bk.bookieWatcher.getNewBookies(metadata.ensembleSize);
+        } catch (BKNotEnoughBookiesException e) {
+            LOG.error("Not enough bookies to create ledger" + ledgerId);
+            cb.createComplete(e.getCode(), null, this.ctx);
+            return;
+        }
+
+        /*
+         * Add ensemble to the configuration
+         */
+        metadata.addEnsemble(new Long(0), ensemble);
+        try {
+            lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+        } catch (GeneralSecurityException e) {
+            LOG.error("Security exception while creating ledger: " + ledgerId, e);
+            cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
+            return;
+        }
+
+        lh.writeLedgerConfig(this, null);
+
+    }
+
+    /**
+     * Implements ZooKeeper stat callback.
+     * 
+     * @see org.apache.zookeeper.AsyncCallback.StatCallback#processResult(int, String, Object, Stat)
+     */
+    public void processResult(int rc, String path, Object ctx, Stat stat) {
+        cb.createComplete(rc, lh, this.ctx);
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.client;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,39 +21,58 @@
  * 
  */
 
-
+import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
 
 /**
- * Ledger entry. Currently only holds the necessary
- * fields to identify a ledger entry, and the entry
- * content.
+ * Ledger entry. Its a simple tuple containing the ledger id, the entry-id, and
+ * the entry content.
  * 
  */
 
 public class LedgerEntry {
-    Logger LOG = Logger.getLogger(LedgerEntry.class);
-    
-    private long lId;
-    private long eId;
-    private byte[] entry;
-    
-    LedgerEntry(long lId, long eId, byte[] entry){
-        this.lId = lId;
-        this.eId = eId;
-        this.entry = entry;
-    }
-    
-    public long getLedgerId(){
-        return lId;
-    }
-    
-    public long getEntryId(){
-        return eId;
-    }
-    
-    public byte[] getEntry(){
-        return entry;
+  Logger LOG = Logger.getLogger(LedgerEntry.class);
+
+  long ledgerId;
+  long entryId;
+  ChannelBufferInputStream entryDataStream;
+
+  int nextReplicaIndexToReadFrom = 0;
+
+  LedgerEntry(long lId, long eId) {
+    this.ledgerId = lId;
+    this.entryId = eId;
+  }
+
+  public long getLedgerId() {
+    return ledgerId;
+  }
+
+  public long getEntryId() {
+    return entryId;
+  }
+
+  public byte[] getEntry() {
+    try {
+      // In general, you can't rely on the available() method of an input
+      // stream, but ChannelBufferInputStream is backed by a byte[] so it
+      // accurately knows the # bytes available
+      byte[] ret = new byte[entryDataStream.available()];
+      entryDataStream.readFully(ret);
+      return ret;
+    } catch (IOException e) {
+      // The channelbufferinput stream doesnt really throw the
+      // ioexceptions, it just has to be in the signature because
+      // InputStream says so. Hence this code, should never be reached.
+      LOG.fatal("Unexpected IOException while reading from channel buffer", e);
+      return new byte[0];
     }
+  }
+
+  public InputStream getEntryInputStream() {
+    return entryDataStream;
+  }
 }