You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2009/06/24 07:07:24 UTC

svn commit: r787907 [1/2] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...

Author: breed
Date: Wed Jun 24 05:07:23 2009
New Revision: 787907

URL: http://svn.apache.org/viewvc?rev=787907&view=rev
Log:
ZOOKEEPER-356. Masking bookie failure during writes to a ledger

Added:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Jun 24 05:07:23 2009
@@ -232,6 +232,8 @@
   ZOOKEEPER-329. document how to integrate 3rd party authentication into ZK
 server ACLs. (breed via mahadev)
 
+  ZOOKEEPER-356. Masking bookie failure during writes to a ledger (flavio via breed)
+
 NEW FEATURES:
 
   ZOOKEEPER-371. jdiff documentation included in build/release (giri via phunt)

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Wed Jun 24 05:07:23 2009
@@ -44,7 +44,7 @@
 
 public class Bookie extends Thread {
     HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
-    Logger LOG = Logger.getLogger(Bookie.class);
+    static Logger LOG = Logger.getLogger(Bookie.class);
     /**
      * 4 byte signature followed by 2-byte major and 2-byte minor versions
      */

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Wed Jun 24 05:07:23 2009
@@ -78,5 +78,4 @@
          */
         void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
     }
-    
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java Wed Jun 24 05:07:23 2009
@@ -26,12 +26,50 @@
      * String used to construct znode paths. They are used in BookKeeper
      *  and LedgerManagementProcessor.
      */
+    
+    /*
+     * Path to ledger metadata. ZooKeeper appends a sequence number to L.
+     */
     static public final String prefix = "/ledgers/L";
+    
+    /*
+     * Parent node to store ensemble composition. Each child corresponds to
+     * one bookie.
+     */
     static public final String ensemble = "/ensemble"; 
+    
+    /*
+     * Quorum size.
+     */
     static public final String quorumSize = "/quorum";
+    
+    /*
+     * Close node.
+     */
     static public final String close = "/close";
+    
+    /*
+     * Quorum mode: VERIFYING or GENERIC
+     */
     static public final String quorumMode = "/mode";
     
+    /*
+     * Marks failure points in during writes to the ledger.
+     */
+    static public final String quorumEvolution = "/quorum_evolution";
+    
+    /*
+     * Ledger is in write mode
+     */
+    
+    static public final int WRITE = 0;
+
+    /*
+     * Ledger is in read mode
+     */
+
+    static public final int READ = 1;
+    
     /**
      * Status ok
      */

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Wed Jun 24 05:07:23 2009
@@ -48,8 +48,12 @@
             return new BKDigestNotInitializedException();
         case Code.DigestMatchException:
             return new BKDigestMatchException();
+        case Code.NotEnoughBookiesException:
+            return new BKNotEnoughBookiesException();
         case Code.NoSuchLedgerExistsException:
             return new BKNoSuchLedgerExistsException();
+        case Code.BookieHandleNotAvailableException:
+            return new BKBookieHandleNotAvailableException();
         default:
             return new BKIllegalOpException();
         }
@@ -62,7 +66,9 @@
         int NoBookieAvailableException = -3;
         int DigestNotInitializedException = -4;
         int DigestMatchException = -5;
-        int NoSuchLedgerExistsException = -6;
+        int NotEnoughBookiesException = -6;
+        int NoSuchLedgerExistsException = -7;
+        int BookieHandleNotAvailableException = -8;
         
         int IllegalOpException = -100;
     }
@@ -89,8 +95,12 @@
             return "Digest engine not initialized";
         case Code.DigestMatchException:
             return "Entry digest does not match";
+        case Code.NotEnoughBookiesException:
+            return "Not enough non-faulty bookies available";
         case Code.NoSuchLedgerExistsException:
             return "No such ledger exists";
+        case Code.BookieHandleNotAvailableException:
+            return "Bookie handle is not available";
         default:
             return "Invalid operation";
         }
@@ -132,10 +142,22 @@
         }   
     }
     
+    public static class BKNotEnoughBookiesException extends BKException {
+        public BKNotEnoughBookiesException(){
+            super(Code.NotEnoughBookiesException);
+        }
+    }
+
     public static class BKNoSuchLedgerExistsException extends BKException {
         public BKNoSuchLedgerExistsException(){
             super(Code.NoSuchLedgerExistsException);
         }   
     }
+    
+    public static class BKBookieHandleNotAvailableException extends BKException {
+        public BKBookieHandleNotAvailableException(){
+            super(Code.BookieHandleNotAvailableException);
+        }   
+    }
 }
     

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +59,8 @@
  * There are three possible operations: start a new ledger, 
  * write to a ledger, and read from a ledger.
  * 
+ * For the ZooKeeper layout, please refer to BKDefs.java.
+ * 
  */
 
 public class BookKeeper 
@@ -142,6 +145,8 @@
         IOException, BKException {
         // Check that quorum size follows the minimum
         long t;
+        LedgerHandle lh = null;
+        
         switch(mode){
         case VERIFIABLE:
             t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
@@ -171,71 +176,77 @@
          */
         String parts[] = path.split("/");
         String subparts[] = parts[2].split("L");
-        long lId = Long.parseLong(subparts[1]);
-        /* 
-         * Get children from "/ledgers/available" on zk
-         */
-        List<String> list = 
-            zk.getChildren("/ledgers/available", false);
-        ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-        /* 
-         * Select ensSize servers to form the ensemble
-         */
-        path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Add quorum size to ZK metadata
-         */
-        ByteBuffer bb = ByteBuffer.allocate(4);
-        bb.putInt(qSize);
-        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Quorum mode
-         */
-        bb = ByteBuffer.allocate(4);
-        bb.putInt(mode.ordinal());
-        zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
-                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        /* 
-         * Create QuorumEngine
-         */
-        LedgerHandle lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
-        //qeMap.put(lId, queue);
-        /*
-         * Adding bookies to ledger handle
-         */
-        Random r = new Random();
-        
-        for(int i = 0; i < ensSize; i++){
-        	int index = 0;
-        	if(list.size() > 1) 
-        		index = r.nextInt(list.size() - 1);
-        	else if(list.size() == 1)
-        	    index = 0;
-        	else {
-        	    LOG.error("Not enough bookies available");
+        try{
+            long lId = Long.parseLong(subparts[1]);
+       
+            /* 
+             * Get children from "/ledgers/available" on zk
+             */
+            List<String> list = 
+                zk.getChildren("/ledgers/available", false);
+            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
+            /* 
+             * Select ensSize servers to form the ensemble
+             */
+            path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+         
+            /* 
+             * Add quorum size to ZK metadata
+             */
+            ByteBuffer bb = ByteBuffer.allocate(4);
+            bb.putInt(qSize);
+            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            /* 
+             * Quorum mode
+             */
+            bb = ByteBuffer.allocate(4);
+            bb.putInt(mode.ordinal());
+            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            /* 
+             * Create QuorumEngine
+             */
+            lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
+            
+            /*
+             * Adding bookies to ledger handle
+             */
+            Random r = new Random();
+        
+            for(int i = 0; i < ensSize; i++){
+                int index = 0;
+                if(list.size() > 1) 
+                    index = r.nextInt(list.size() - 1);
+                else if(list.size() == 1)
+                    index = 0;
+                else {
+                    LOG.error("Not enough bookies available");
         	    
-        	    return null;
-        	}
+                    return null;
+                }
             
-        	try{
-        	    String bookie = list.remove(index);
-        	    LOG.info("Bookie: " + bookie);
-        	    InetSocketAddress tAddr = parseAddr(bookie);
-        	    int bindex = lh.addBookie(tAddr); 
-        	    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
-        	    bindexBuf.putInt(bindex);
+                try{
+                    String bookie = list.remove(index);
+                    LOG.info("Bookie: " + bookie);
+                    InetSocketAddress tAddr = parseAddr(bookie);
+                    int bindex = lh.addBookieForWriting(tAddr); 
+                    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+                    bindexBuf.putInt(bindex);
         	    
-        	    String pBookie = "/" + bookie;
-        	    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
-        	            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        	} catch (IOException e) {
-        	    LOG.error(e);
-        	    i--;
-        	} 
+                    String pBookie = "/" + bookie;
+                    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
+                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                } catch (IOException e) {
+                    LOG.error(e);
+                    i--;
+                } 
+            }
+            LOG.debug("Created new ledger");
+        } catch (NumberFormatException e) {
+            LOG.error("Error when parsing the ledger identifier", e);
         }
-        LOG.debug("Created new ledger");
         // Return ledger handler
         return lh; 
     }
@@ -333,7 +344,6 @@
          */
         data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
         buf = ByteBuffer.wrap(data);
-        //int ordinal = buf.getInt();
         
         QMode qMode;
         switch(buf.getInt()){
@@ -361,22 +371,62 @@
         List<String> list = 
             zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
         
-        LOG.info("Length of list of bookies: " + list.size());
+        LOG.debug("Length of list of bookies: " + list.size());
         for(int i = 0 ; i < list.size() ; i++){
             for(String s : list){
+                LOG.debug("Extracting bookie: " + s);
                 byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
                 ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                 if(bindexBuf.getInt() == i){                      
                     try{
-                        lh.addBookie(parseAddr(s));
+                        lh.addBookieForReading(parseAddr(s));
                     } catch (IOException e){
                         LOG.error(e);
                     }
                 }
             }
         }
+        
+        /*
+         * Read changes to quorum over time. To determine if there has been changes during
+         * writes to the ledger, check if there is a znode called quorumEvolution.
+         */
+        if(zk.exists(BKDefs.prefix + 
+                getZKStringId(lh.getId()) +  
+                BKDefs.quorumEvolution, false) != null){
+                    String path = BKDefs.prefix + 
+                    getZKStringId(lh.getId()) +  
+                    BKDefs.quorumEvolution;
+                    
+                    List<String> faultList = zk.getChildren(path, false);
+                    try{
+                        for(String s : faultList){
+                            LOG.debug("Faulty list child: " + s);
+                            long entry = Long.parseLong(s);
+                            String addresses = new String(zk.getData(path + "/" + s, false, stat));
+                            String parts[] = addresses.split(" ");
+
+                            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+                            for(int i = 0 ; i < parts.length ; i++){
+                                LOG.debug("Address: " + parts[i]);
+                                InetSocketAddress faultyBookie =  
+                                    parseAddr(parts[i].substring(1));                           
+                        
+                                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+                            }
+                            lh.setNewBookieConfig(entry, newBookieSet);
+                            LOG.debug("NewBookieSet size: " + newBookieSet.size());
+                        }
+
+                        lh.prepareEntryChange();
+                    } catch (NumberFormatException e) {
+                        LOG.error("Error when parsing the ledger identifier", e);
+                    }
+                }
       
-        // Return ledger handler
+        /*
+         *  Return ledger handler
+         */
         return lh;
     }    
     
@@ -518,12 +568,14 @@
      *  @param	a	InetSocketAddress
      */
     
-    synchronized BookieHandle getBookieHandle(InetSocketAddress a)
+    synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
     throws ConnectException, IOException {
     	if(!bhMap.containsKey(a)){
-    		bhMap.put(a, new BookieHandle(a));
+    	    BookieHandle bh = new BookieHandle(a, true); 
+    		bhMap.put(a, bh);
+    		bh.start();
     	}
-    	bhMap.get(a).incRefCount();
+    	bhMap.get(a).incRefCount(lh);
     	
     	return bhMap.get(a);
     }
@@ -533,9 +585,10 @@
      * remove it from the list. 
      */
     
-    synchronized void haltBookieHandles(ArrayList<BookieHandle> bookies){
-        for(BookieHandle bh : bookies){
-            if(bh.halt() <= 0)
+    synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
+        while(bookies.size() > 0){
+            BookieHandle bh = bookies.remove(0);
+            if(bh.halt(lh) <= 0)
                 bhMap.remove(bh.addr);
         }
     }
@@ -549,5 +602,15 @@
         bookieBlackList.add(addr);
     }
     
-   
+    /**
+     * Halts all bookie handles
+     * 
+     */
+    public void halt() throws InterruptedException{
+        
+        for(BookieHandle bh: bhMap.values()){
+            bh.shutdown();
+        }
+        zk.close();
+    }
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,8 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.security.NoSuchAlgorithmException;
@@ -31,12 +33,15 @@
 import javax.crypto.Mac; 
 import javax.crypto.spec.SecretKeySpec;
 
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubStopOp;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.log4j.Logger;
 
@@ -47,15 +52,17 @@
  * 
  */
 
-class BookieHandle extends Thread{
-    Logger LOG = Logger.getLogger(BookieClient.class);
+public class BookieHandle extends Thread {
+    static Logger LOG = Logger.getLogger(BookieClient.class);
     
-    boolean stop = false;
+    volatile boolean stop = false;
+    boolean noreception = false;
     private BookieClient client;
     InetSocketAddress addr;
     static int recvTimeout = 2000;
     private ArrayBlockingQueue<ToSend> incomingQueue;
     private int refCount = 0;
+    HashSet<LedgerHandle> ledgers;
     
     /**
      * Objects of this class are queued waiting to be
@@ -79,13 +86,17 @@
      * @param addr	address of the bookkeeper server that this
      * handle should connect to.
      */
-    BookieHandle(InetSocketAddress addr) throws IOException {
-        this.client = new BookieClient(addr, recvTimeout);
+    BookieHandle(InetSocketAddress addr, boolean enabled) throws IOException {
+        this.stop = !enabled;
+        this.noreception = !enabled;
+        if(!stop)
+            this.client = new BookieClient(addr, recvTimeout);
+        else
+            this.client = null;
+        
         this.addr = addr;
         this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
-        
-        //genSecurePadding();
-        start();
+        this.ledgers = new HashSet<LedgerHandle>();
     }
     
     
@@ -100,22 +111,39 @@
     }
 
     /**
-     * Sending add operation to bookie
+     * Sending add operation to bookie. We have to synchronize the send to guarantee
+     * that requests will either get a response or throw an exception. 
      * 
      * @param r
      * @param cb
      * @param ctx
      * @throws IOException
      */
-    public void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
-    throws IOException {
+    public synchronized void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
+    throws IOException, BKException {
         try{
-            incomingQueue.put(new ToSend(lh, r, entry));
+            if(!noreception){
+                ToSend ts = new ToSend(lh, r, entry);
+                if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+                    throw BKException.create(Code.BookieHandleNotAvailableException);
+            } else {
+                throw BKException.create(Code.BookieHandleNotAvailableException);
+            }
         } catch(InterruptedException e){
             LOG.warn("Interrupted while waiting for room in the incoming queue");
         }
     }
     
+    private synchronized void sendStop(){
+        try{
+            noreception = true;
+            LOG.debug("Sending stop signal");
+            incomingQueue.put(new ToSend(null, new SubStopOp(new StopOp()), -1));
+            LOG.debug("Sent stop signal");
+        } catch(InterruptedException e) {
+            LOG.fatal("Interrupted while sending stop signal to bookie handle");
+        }       
+    }
     /**
      * MAC instance
      * 
@@ -142,29 +170,41 @@
      * @throws IOException
      */
     
-    public void sendRead(LedgerHandle lh, SubReadOp r, long entry)
-    throws IOException {
+    public synchronized void sendRead(LedgerHandle lh, SubReadOp r, long entry)
+    throws IOException, BKException {
         try{
-            incomingQueue.put(new ToSend(lh, r, entry));
+            if(!noreception){           
+                ToSend ts = new ToSend(lh, r, entry);
+                if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+                    throw BKException.create(Code.BookieHandleNotAvailableException);
+            } else {
+                throw BKException.create(Code.BookieHandleNotAvailableException);
+            }
         } catch(InterruptedException e){
             LOG.warn("Interrupted while waiting for room in the incoming queue");
         }
     }
     
     public void run(){
-        while(!stop){
-            try{
-                ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+        ToSend ts;
+        
+        try{
+            while(!stop){
+                ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+                    
                 if(ts != null){
                 	LedgerHandle self = ts.lh;
                     switch(ts.type){
+                    case Operation.STOP:
+                        LOG.info("Stopping BookieHandle: " + addr);
+                        client.errorOut();                   
+                        cleanQueue();
+                        LOG.debug("Stopped");
+                        break;
                     case Operation.ADD:
                         SubAddOp aOp = (SubAddOp) ts.ctx;
                         AddOp op = ((AddOp) aOp.op);
                         
-                        /*
-                         * TODO: Really add the confirmed add to the op
-                         */
                         long confirmed = self.getAddConfirmed();
                         ByteBuffer extendedData;
     
@@ -179,7 +219,6 @@
                             extendedData.rewind();
                             byte[] toProcess = new byte[op.data.length + 24];
                             extendedData.get(toProcess, 0, op.data.length + 24);
-                            //extendedData.limit(extendedData.capacity() - 20);
                             extendedData.position(extendedData.capacity() - 20);
                             if(mac == null)
                                 getMac(self.getMacKey(), "HmacSHA1");
@@ -200,47 +239,133 @@
                                 ts.ctx);
                         break;
                     case Operation.READ:
-                        client.readEntry(self.getId(),
-                            ts.entry,
-                            ((SubReadOp) ts.ctx).rcb,
-                            ts.ctx);
+                        if(client != null)
+                            client.readEntry(self.getId(),
+                                    ts.entry,
+                                    ((SubReadOp) ts.ctx).rcb,
+                                    ts.ctx);
+                        else ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
                         break;
                     }
-                }
-            } catch (InterruptedException e){
-                LOG.error(e);
-            } catch (IOException e){
-                LOG.error(e);
-            } catch (NoSuchAlgorithmException e){
-                LOG.error(e);
-            } catch (InvalidKeyException e) {
-                LOG.error(e);
+                } else LOG.warn("Empty queue: " + addr);
             }
-        }
+        } catch (Exception e){
+            LOG.error("Handling exception before halting BookieHandle", e);
+            for(LedgerHandle lh : ledgers)
+                lh.removeBookie(this);
+            
+            /*
+             * We only need to synchronize when setting noreception to avoid that
+             * a client thread add another request to the incomingQueue after we
+             * have cleaned it.
+             */
+            synchronized(this){
+                noreception = true;
+            }
+            client.halt();
+            client.errorOut();
+            cleanQueue();
+        } 
+        
+        LOG.info("Exiting bookie handle thread: " + addr);
     }
+        
     
     /**
      * Multiple ledgers may use the same BookieHandle object, so we keep
      * a count on the number of references.
      */
-    int incRefCount(){
+    int incRefCount(LedgerHandle lh){
+        ledgers.add(lh);
         return ++refCount;
     }
     
     /**
      * Halts if there is no ledger using this object.
+     *
+     * @return  int reference counter
      */
-    int halt(){
+    synchronized int halt(LedgerHandle lh){
+        LOG.info("Calling halt");
+        ledgers.remove(lh);
         int currentCount = --refCount;
         if(currentCount <= 0){
-            stop = true;
+            shutdown();
         }
         
         if(currentCount < 0)
             LOG.warn("Miscalculated the number of reference counts: " + addr);
-        
+
         return currentCount;
     }
+    
+    /**
+     * Halt this bookie handle independent of the number of ledgers using it. Called upon a 
+     * failure to write. This method cannot be called by this thread because it may cause a
+     * deadlock as shutdown invokes sendStop. The deadlock comes from sendAdd blocking on
+     * incomingQueue when the queue is full and the thread also blocking on it when
+     * trying to send the stop marker. Because this thread is actually the consumer, if it
+     * does not make progress, then we have a deadlock. 
+     * 
+     * @return int  reference counter
+     */
+    synchronized public int halt(){
+        if(!stop){
+            LOG.info("Calling halt");
+            for(LedgerHandle lh : ledgers)
+                lh.removeBookie(this);
+            refCount = 0;
+            shutdown();
+        }
+     
+        return refCount;
+    }
+    
+    /**
+     * Stop this bookie handle completely.
+     * 
+     */
+    public void shutdown(){
+        if(!stop){
+            LOG.info("Calling shutdown");
+            LOG.debug("Halting client");
+            client.halt();
+            LOG.debug("Cleaning queue");
+            sendStop();
+            LOG.debug("Finished shutdown"); 
+        }
+    }
+    
+    /**
+     * Invokes the callback method for pending requests in the queue
+     * of this BookieHandle.
+     */
+    private void cleanQueue(){
+        stop = true;
+        ToSend ts = incomingQueue.poll();
+        while(ts != null){
+            switch(ts.type){
+            case Operation.ADD:
+                SubAddOp aOp = (SubAddOp) ts.ctx;
+                aOp.wcb.writeComplete(-1, ts.lh.getId(), ts.entry, ts.ctx);
+     
+                break;
+            case Operation.READ:                
+                ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
+                break;
+            }
+            ts = incomingQueue.poll();
+        }
+    }
+                
+    /**
+     * Returns the negated value of stop, which gives the status of the
+     * BookieHandle.
+     */
+    
+    boolean isEnabled(){
+        return !stop;
+    }
 }
 
     

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Wed Jun 24 05:07:23 2009
@@ -38,17 +38,17 @@
  */
 
 class ClientCBWorker extends Thread{
-    Logger LOG = Logger.getLogger(ClientCBWorker.class);
+    static Logger LOG = Logger.getLogger(ClientCBWorker.class);
     static ClientCBWorker instance = null;
     
-    private boolean stop = false;
+    private volatile boolean stop;
     private static int instanceCounter= 0;
     
     ArrayBlockingQueue<Operation> pendingOps;
     QuorumOpMonitor monitor;
     
     
-    static synchronized ClientCBWorker getInstance(){
+    static ClientCBWorker getInstance(){
         if(instance == null){
             instance = new ClientCBWorker();
         }
@@ -63,9 +63,10 @@
      * 
      */
     ClientCBWorker(){
-       pendingOps = new ArrayBlockingQueue<Operation>(4000);  
+       pendingOps = new ArrayBlockingQueue<Operation>(6000);  
+       stop = false;
        start();
-       LOG.debug("Have started cbWorker");
+       LOG.info("Have started cbWorker");
     }
     
     
@@ -84,11 +85,11 @@
      * Gets thread out of its main loop.
      * 
      */
-    synchronized void shutdown(){
+    void shutdown(){
         if((--instanceCounter) == 0){
             stop = true;
             instance = null;
-            LOG.debug("Shutting down");
+            LOG.info("Shutting down CBWorker");
         }
     }
     
@@ -105,14 +106,14 @@
                 if(op != null){
                     synchronized(op){
                         while(!op.isReady()){
-                            op.wait();
+                            op.wait(1000);
                         }
                     }
-
+                    
                     switch(op.type){
                     case Operation.ADD:
                         AddOp aOp = (AddOp) op;
-                    
+                       
                         aOp.getLedger().setAddConfirmed(aOp.entry);
                         aOp.cb.addComplete(aOp.getErrorCode(),
                                 aOp.getLedger(),
@@ -122,14 +123,13 @@
                         break;
                     case Operation.READ:
                         ReadOp rOp = (ReadOp) op;
-                        //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
                         rOp.cb.readComplete(rOp.getErrorCode(), 
                                 rOp.getLedger(),
                                 new LedgerSequence(rOp.seq), 
                                 rOp.ctx);
                         break;
                     }
-                }
+                } 
             }
         } catch (InterruptedException e){
            LOG.error("Exception while waiting on queue or operation"); 

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Jun 24 05:07:23 2009
@@ -28,7 +28,11 @@
 import java.security.NoSuchAlgorithmException;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.TreeMap;
 
+import org.apache.bookkeeper.client.BKDefs;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -56,7 +60,7 @@
      * ledgerhandle->write->bookeeper->quorumengine->bookiehandle
      * ->bookieclient
      */
-    Logger LOG = Logger.getLogger(LedgerHandle.class);
+   static Logger LOG = Logger.getLogger(LedgerHandle.class);
     
     public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
     
@@ -64,12 +68,16 @@
     private long ledger;
     private volatile long last;
     private volatile long lastAddConfirmed = 0;
-    private ArrayList<BookieHandle> bookies;
+    private HashMap<Integer, Long> lastRecvCorrectly;
+    private volatile ArrayList<BookieHandle> bookies;
     private ArrayList<InetSocketAddress> bookieAddrList;
+    private TreeMap<Long, ArrayList<BookieHandle> > bookieConfigMap;
+    private long[] entryChange;
     private BookKeeper bk;
     private QuorumEngine qe;
     private int qSize;
     private QMode qMode = QMode.VERIFIABLE;
+    private int lMode;
 
     private int threshold;
     private String digestAlg = "SHA1";
@@ -94,6 +102,7 @@
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
         this.passwd = passwd;
         genLedgerKey(passwd);
         genMacKey(passwd);
@@ -122,6 +131,8 @@
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
 
         this.qSize = qSize;
         this.qMode = mode;
@@ -150,6 +161,8 @@
         this.ledger = ledger;
         this.last = last;
         this.bookies = new ArrayList<BookieHandle>();
+        this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
 
         this.qSize = qSize;
         this.passwd = passwd;
@@ -165,7 +178,7 @@
     			LOG.debug("Opening bookieHandle: " + a);
             
     			//BookieHandle bh = new BookieHandle(this, a);
-    			this.bookies.add(bk.getBookieHandle(a));
+    			this.bookies.add(bk.getBookieHandle(this, a));
     		}
     	} catch(ConnectException e){
     		LOG.error(e);
@@ -198,15 +211,37 @@
      * 
      * @param addr	socket address
      */
-    int addBookie(InetSocketAddress addr)
+    int addBookieForWriting(InetSocketAddress addr)
     throws IOException {
         LOG.debug("Bookie address: " + addr);
+        lMode = BKDefs.WRITE;
         //BookieHandle bh = new BookieHandle(this, addr);
-        this.bookies.add(bk.getBookieHandle(addr));
+        this.bookies.add(bk.getBookieHandle(this, addr));
         if(bookies.size() > qSize) setThreshold();
         return (this.bookies.size() - 1);
     }
     
+    /**
+     * Create bookie handle and add it to the list
+     * 
+     * @param addr  socket address
+     */
+    int addBookieForReading(InetSocketAddress addr)
+    throws IOException {
+        LOG.debug("Bookie address: " + addr);
+        lMode = BKDefs.READ;
+        //BookieHandle bh = new BookieHandle(this, addr);
+        try{
+            this.bookies.add(bk.getBookieHandle(this, addr));
+        } catch (IOException e){
+            LOG.info("Inserting a decoy bookie handle");
+            this.bookies.add(new BookieHandle(addr, false));
+        }
+        if(bookies.size() > qSize) setThreshold();
+        return (this.bookies.size() - 1);
+    }
+
+    
     private void setThreshold() {
         switch(qMode){
         case GENERIC:
@@ -225,6 +260,47 @@
         return threshold;
     }
     
+    
+    /**
+     * Writes to BookKeeper changes to the ensemble.
+     *         
+     * @param addr  Address of faulty bookie
+     * @param entry Last entry written before change of ensemble.
+     */
+    
+    void changeEnsemble(long entry){
+        String path = BKDefs.prefix + 
+        bk.getZKStringId(getId()) +  
+        BKDefs.quorumEvolution + "/" + 
+        String.format("%010d", entry);
+        
+        LOG.info("Report failure: " + String.format("%010d", entry));
+        try{
+            if(bk.getZooKeeper().exists(BKDefs.prefix + 
+                    bk.getZKStringId(getId()) +  
+                    BKDefs.quorumEvolution, false) == null)
+                bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(getId()) + 
+                        BKDefs.quorumEvolution, new byte[0], Ids.OPEN_ACL_UNSAFE, 
+                        CreateMode.PERSISTENT);
+        
+            boolean first = true;
+            String addresses = "";
+            for(BookieHandle bh : bookies){
+                if(first){ 
+                    addresses = bh.addr.toString();
+                    first = false;
+                }
+                else 
+                    addresses = addresses + " " + bh.addr.toString();
+            }
+            
+            bk.getZooKeeper() .create(path, addresses.getBytes(),
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch(Exception e){
+            LOG.error("Could not write to ZooKeeper: " + path + ", " + e);
+        }
+    }
+    
     /**
      * Replace bookie in the case of a failure 
      */
@@ -250,7 +326,7 @@
                 /*
                  * If successful in writing to new bookie, add it to the set
                  */
-                this.bookies.set(index, bk.getBookieHandle(addr));
+                this.bookies.set(index, bk.getBookieHandle(this, addr));
             } catch(ConnectException e){
                 bk.blackListBookie(addr);
                 LOG.error(e);
@@ -266,16 +342,23 @@
      * to replace the current faulty one. In such cases,
      * we simply remove the bookie.
      * 
-     * @param index
+     * 
+     * @param BookieHandle
      */
-    void removeBookie(int index){
-        bookies.remove(index);
-    }
-    
-    void closeUp(){
-        ledger = -1;
-        last = -1;
-        bk.haltBookieHandles(bookies);
+    synchronized void removeBookie(BookieHandle bh){
+       if(lMode == BKDefs.WRITE){
+           LOG.info("Removing bookie: " + bh.addr);
+           int index = bookies.indexOf(bh);
+           if(index >= 0){
+               Long tmpLastRecv = lastRecvCorrectly.get(index);
+               bookies.remove(index);
+        
+               if(tmpLastRecv == null)
+                   changeEnsemble(0);
+               else
+                   changeEnsemble(tmpLastRecv);
+           }
+       }
     }
     
     
@@ -328,6 +411,11 @@
         return lastAddConfirmed;
     }
     
+    void setLastRecvCorrectly(int sId, long entry){
+        //LOG.info("Setting last received correctly: " + entry);
+        lastRecvCorrectly.put(sId, entry);
+    }
+    
     /**
      * Returns the list of bookies
      * @return ArrayList<BookieHandle>
@@ -337,6 +425,73 @@
     }
     
     /**
+     * For reads, there might be multiple operations.
+     * 
+     * @param entry
+     * @return ArrayList<BookieHandle>  returns list of bookies
+     */
+    ArrayList<BookieHandle> getBookies(long entry){
+        return getConfig(entry);
+    }
+    
+    /**
+     * Returns the bookie handle corresponding to the addresses in the input.
+     * 
+     * @param addr
+     * @return
+     */
+    BookieHandle getBookieHandleDup(InetSocketAddress addr){
+        for(BookieHandle bh : bookies){
+            if(bh.addr.equals(addr))
+                return bh;
+        }
+        
+        return null;
+    }
+    
+    /**
+     * Sets a new bookie configuration corresponding to a failure during
+     * writes to the ledger. We have one configuration for every failure.
+     * 
+     * @param entry
+     * @param list
+     */
+    
+    void setNewBookieConfig(long entry, ArrayList<BookieHandle> list){
+        if(bookieConfigMap == null)
+            bookieConfigMap = new TreeMap<Long, ArrayList<BookieHandle> >();
+        
+        /*
+         * If initial config is not in the list, we include it.
+         */
+        if(!bookieConfigMap.containsKey(new Long(0))){
+            bookieConfigMap.put(new Long(0), bookies);
+        }
+        
+        LOG.info("Adding new entry: " + entry + ", " + bookies.size() + ", " + list.size());
+        bookieConfigMap.put(entry, list);
+    }
+    
+    /**
+     * Once we read all changes to the bookie configuration, we
+     * have to call this method to generate an array that we use
+     * to determine the bookie configuration for an entry.
+     * 
+     * Note that this array is a performance optimization and 
+     * it is not necessary for correctness. We could just use 
+     * bookieConfigMap but it would be slower.
+     */
+    
+    void prepareEntryChange(){
+        entryChange = new long[bookieConfigMap.size()];
+    
+        int counter = 0;
+        for(Long l : bookieConfigMap.keySet()){
+            entryChange[counter++] = l;
+        }
+    }
+    
+    /**
      * Return the quorum size. By default, the size of a quorum is (n+1)/2, 
      * where n is the size of the set of bookies.
      * @return int
@@ -345,6 +500,36 @@
         return qSize;   
     }
     
+    
+    /**
+     *  Returns the config corresponding to the entry
+     *  
+     * @param entry
+     * @return
+     */
+    private ArrayList<BookieHandle> getConfig(long entry){
+        if(bookieConfigMap == null)
+            return bookies;
+        
+        int index = Arrays.binarySearch(entryChange, entry);
+        
+        /*
+         * If not on the map, binarySearch returns a negative value
+         */
+        int before = index;
+        index = index >= 0? index : ((-1) - index);
+
+        if(index == 0){
+            if((entry % 10) == 0){
+                LOG.info("Index: " + index + ", " + before + ", " + entry + ", " + bookieConfigMap.get(entryChange[index]).size());
+            }
+            return bookieConfigMap.get(entryChange[index]); 
+        } else{
+            //LOG.warn("IndexDiff " + entry);
+            return bookieConfigMap.get(entryChange[index - 1]);
+        }
+    }
+    
     /**
      * Returns the quorum mode for this ledger: Verifiable or Generic
      */
@@ -440,12 +625,18 @@
        return ledgerKey; 
     }
     
+    void closeUp(){
+        ledger = -1;
+        last = -1;
+        bk.haltBookieHandles(this, bookies);
+    }
+    
     /**
      * Close ledger.
      * 
      */
     public void close() 
-    throws KeeperException, InterruptedException {
+    throws KeeperException, InterruptedException, BKException {
         //Set data on zookeeper
         ByteBuffer last = ByteBuffer.allocate(8);
         last.putLong(lastAddConfirmed);
@@ -456,13 +647,12 @@
                    last.array(), 
                    Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT); 
-        } else {
-            bk.getZooKeeper().setData(closePath, 
-                last.array(), -1);
-        }
+        } 
+        
         closeUp();
         StopOp sOp = new StopOp();
         qe.sendOp(sOp);
+        LOG.info("##### CB worker queue size: " + qe.cbWorker.pendingOps.size());
     }
     
     /**
@@ -515,7 +705,7 @@
         
         RetCounter counter = new RetCounter();
         counter.inc();
-        
+     
         Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
         qe.sendOp(r);
         
@@ -523,7 +713,10 @@
         counter.block(0);
         LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
         
-        if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
+        if(counter.getSequence() == null){
+            LOG.error("Failed to read entries: " + firstEntry + ", " + lastEntry);
+            throw BKException.create(Code.ReadException);
+        }
         return counter.getSequence();
     }
    
@@ -535,7 +728,7 @@
      * @param ctx   some control object
      */
     public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
-    throws InterruptedException {
+    throws InterruptedException, BKException {
         AddOp r = new AddOp(this, data, cb, ctx);
         qe.sendOp(r);
     }
@@ -548,7 +741,7 @@
      */
     
     public long addEntry(byte[] data)
-    throws InterruptedException{
+    throws InterruptedException, BKException{
         LOG.debug("Adding entry " + data);
         RetCounter counter = new RetCounter();
         counter.inc();

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java Wed Jun 24 05:07:23 2009
@@ -95,7 +95,7 @@
         /**
          * Set value of action
          * 
-         * @return
+         * @return int  return action identifier
          */
         int setAction(int action){
             return this.action = action;
@@ -104,7 +104,7 @@
         /**
          * Return value of action
          * 
-         * @return
+         * @return  int  return action identifier
          */
         int getAction(){
             return action;
@@ -122,7 +122,7 @@
         /**
          * Return return code
          * 
-         * @return
+         * @return int return code
          */
         int getRC(){
             return rc;
@@ -365,7 +365,11 @@
         private int qSize;
         private long last;
         private QMode qMode;
-        private List<String> bookieIds;
+        private List<String> children;
+        
+        private String dataString;
+        private String item;
+        private AtomicInteger counter;
 
         /**
          * Constructor of request to open a ledger.
@@ -468,8 +472,8 @@
          * 
          * @param list  list of bbokie identifiers
          */
-        void addBookieIds(List<String> list){
-            this.bookieIds = list;
+        void addChildren(List<String> list){
+            this.children = list;
         }
         
         /**
@@ -477,8 +481,55 @@
          * 
          * @return List<String> list of bookie identifiers
          */
-        List<String> getBookieIds(){
-            return bookieIds;
+        List<String> getChildren(){
+            return children;
+        }
+        
+        /**
+         * Returns the size of the children list. Used in processOpen.
+         * 
+         * @return int
+         */
+        int getListSize(){
+            return children.size();
+        }
+        
+        /**
+         * Sets the value of item. This is used in processOpen to
+         * keep the item value of the list of ensemble changes.
+         * 
+         * @param item
+         */
+        void setItem(String item){
+            this.item = item;
+        }
+        
+        /**
+         * Returns the value of item
+         * 
+         * @return String
+         */
+        
+        String getItem(){
+            return item;
+        }
+        
+        /**
+         * Sets the value of dataString
+         * 
+         * @param data  value to set
+         */
+        void setStringData(String data){
+            this.dataString = data;
+        }
+        
+        /**
+         * Returns the value of dataString
+         * 
+         * @return String
+         */
+        String getStringData(){
+            return dataString;
         }
     }
     
@@ -731,7 +782,7 @@
                     String bookie = children.remove(index);
                     LOG.info("Bookie: " + bookie);
                     InetSocketAddress tAddr = bk.parseAddr(bookie);
-                    int bindex = cop.getLh().addBookie(tAddr); 
+                    int bindex = cop.getLh().addBookieForWriting(tAddr); 
                     ByteBuffer bindexBuf = ByteBuffer.allocate(4);
                     bindexBuf.putInt(bindex);
                 
@@ -773,6 +824,9 @@
         if(oop.getRC() != BKDefs.EOK)
             oop.getCb().openComplete(oop.getRC(), null, oop.getCtx());
         
+        String path;
+        LedgerHandle lh;
+        
         switch(oop.getAction()){
         case 0:                    
             /*
@@ -833,7 +887,7 @@
             /*
              *  Create ledger handle
              */
-            LedgerHandle lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
+            lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
                 
             /*
              * Get children of "/ledgers/id/ensemble" 
@@ -846,7 +900,7 @@
             break;
 
         case 7:
-            List<String> list = oop.getBookieIds();
+            List<String> list = oop.getChildren();
             LOG.info("Length of list of bookies: " + list.size());
             try{
                 for(int i = 0 ; i < list.size() ; i++){
@@ -855,19 +909,81 @@
                                 false, new Stat());
                         ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                         if(bindexBuf.getInt() == i){                      
-                            oop.getLh().addBookie(bk.parseAddr(s));
+                            oop.getLh().addBookieForReading(bk.parseAddr(s));
                         }
                     }
                 }
+
+                /*
+                 * Check if there has been any change to the ensemble of bookies
+                 * due to failures.
+                 */
+                bk.getZooKeeper().exists(BKDefs.prefix + 
+                        bk.getZKStringId(oop.getLid()) +  
+                        BKDefs.quorumEvolution, 
+                                false,
+                                this,
+                                oop);
+                        
             } catch(KeeperException e){
                 LOG.error("Exception while adding bookies", e);
                 oop.setRC(BKDefs.EZK);
+                oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
             } catch(IOException e){
                 LOG.error("Exception while trying to connect to bookie");
                 oop.setRC(BKDefs.EIO);
-            } finally {
+                oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+            } 
+            
+             break;
+        
+        case 8:
+            path = BKDefs.prefix + 
+            bk.getZKStringId(oop.getLid()) +  
+            BKDefs.quorumEvolution;
+                
+            bk.getZooKeeper().getChildren(path, 
+                    false,
+                    this,
+                    oop);
+        case 9: 
+            oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+            break;
+        case 10:        
+            path = BKDefs.prefix + 
+            bk.getZKStringId(oop.getLid()) +  
+            BKDefs.quorumEvolution;
+            
+            for(String s : oop.getChildren()){
+                oop.setItem(s);
+                bk.getZooKeeper().getData(path + "/" + s, 
+                        false, 
+                        this,
+                        oop);
+            }
+            
+            break;
+        case 11:
+            lh = oop.getLh();
+            
+            String parts[] = oop.getStringData().split(" ");
+
+            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+            for(int i = 0 ; i < parts.length ; i++){
+                LOG.info("Address: " + parts[i]);
+                InetSocketAddress faultyBookie =  
+                    bk.parseAddr(parts[i].substring(1));                           
+        
+                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+            }
+            lh.setNewBookieConfig(Long.parseLong(oop.getItem()), newBookieSet);
+        
+            if(oop.counter.incrementAndGet() == oop.getListSize()){
+                lh.prepareEntryChange();
                 oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
             }
+            
+            break;
         }
     }    
     
@@ -956,6 +1072,12 @@
                 else
                     op.setAction(4);
                 break;
+            case 8:
+                if(stat == null)
+                    op.setAction(9);
+                else
+                    op.setAction(10);
+                break;
             }
         case CLOSE:
             CloseLedgerOp clop = (CloseLedgerOp) op;
@@ -1064,7 +1186,7 @@
                   break;
               case OPEN:
                   OpenLedgerOp oop = (OpenLedgerOp) op;
-                  oop.addBookieIds(children);
+                  oop.addChildren(children);
                   break;
        }
        
@@ -1119,6 +1241,12 @@
                            oop.setQMode(QMode.VERIFIABLE);
                        LOG.info("Verifiable ledger");
                        }
+                       break;
+                   case 10:
+                       String addr = new String(data);
+                       oop.setStringData(addr);
+                       oop.setAction(11);
+                       break;
                    }
                    break;
                default:

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Wed Jun 24 05:07:23 2009
@@ -31,6 +31,7 @@
 import java.util.HashMap;
 import java.util.TreeMap;
 
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerSequence;
@@ -48,7 +49,7 @@
  * 
  */
 
-class LedgerRecoveryMonitor implements ReadEntryCallback{
+class LedgerRecoveryMonitor implements ReadEntryCallback {
     Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
     
     BookKeeper self;
@@ -132,11 +133,10 @@
         
         /*
          * Obtain largest hint 
-         */
-        
+         */ 
         LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
         for(InetSocketAddress addr : bookies){
-            lh.addBookie(addr);
+            lh.addBookieForReading(addr);
         }
         
         boolean notLegitimate = true;
@@ -241,4 +241,5 @@
         
         return hint;
     }
+    
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Wed Jun 24 05:07:23 2009
@@ -22,9 +22,12 @@
 
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.ClientCBWorker;
 import org.apache.bookkeeper.client.QuorumOpMonitor;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -43,7 +46,7 @@
  */
 
 public class QuorumEngine {
-    Logger LOG = Logger.getLogger(QuorumEngine.class);
+    static Logger LOG = Logger.getLogger(QuorumEngine.class);
 
     QuorumOpMonitor opMonitor;
     ClientCBWorker cbWorker;
@@ -56,6 +59,11 @@
      * ADD, STOP.
      */
     
+    static long idCounter; 
+    static synchronized long getOpId(){
+        return idCounter++;
+    }
+    
     public static class Operation {
         public static final int READ = 0;
         public static final int ADD = 1;
@@ -64,9 +72,19 @@
         
         int type;
         LedgerHandle ledger;
+        long id;
         int rc = 0;
         boolean ready = false;
         
+         public Operation(){
+             this.id = getOpId();
+         }
+            
+         long getId(){
+             return id;
+         }
+            
+
         public static class AddOp extends Operation {
             AddCallback cb;
             Object ctx;
@@ -178,11 +196,18 @@
              this.rcb = rcb;
          }
      }
+     
+     public static class SubStopOp extends SubOp{
+         SubStopOp(Operation op){
+             this.op = op;
+         }
+     }
     }
     
     public QuorumEngine(LedgerHandle lh){ 
         this.lh = lh;
-        this.opMonitor = QuorumOpMonitor.getInstance(lh);
+        this.opMonitor = new QuorumOpMonitor(lh);
+        QuorumEngine.idCounter = 0;
         LOG.debug("Creating cbWorker");
         this.cbWorker = ClientCBWorker.getInstance();
         LOG.debug("Created cbWorker");
@@ -195,11 +220,12 @@
      * @param r Operation descriptor
      */
     void sendOp(Operation r)
-    throws InterruptedException {
+    throws InterruptedException, BKException {
+        int n;    
         
-        int n = lh.getBookies().size();
         switch(r.type){
         case Operation.READ:
+            
             Operation.ReadOp rOp = (Operation.ReadOp) r;
             
             LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
@@ -211,6 +237,10 @@
                 long counter = 0;
                 PendingReadOp pROp = new PendingReadOp(lh);
                 
+                n = lh.getBookies(entry).size();
+                if(n < lh.getQuorumSize())
+                    throw BKException.create(Code.NotEnoughBookiesException);
+                
                 //Send requests to bookies
                 while(counter < lh.getQuorumSize()){
                     int index = (int)((entry + counter++) % n);
@@ -219,7 +249,9 @@
                                 pROp, 
                                 index,
                                 opMonitor);
-                        lh.getBookies().get((index) % n).sendRead(lh, sRead, entry);            
+   
+                        BookieHandle bh = lh.getBookies(entry).get((index) % n); 
+                        if(bh.isEnabled()) bh.sendRead(lh, sRead, entry);            
                     } catch(IOException e){
                         LOG.error(e);
                     }
@@ -228,11 +260,18 @@
   
             break;
         case Operation.ADD:
+            n = lh.getBookies().size();
+
+            if(n < lh.getQuorumSize())
+                throw BKException.create(Code.NotEnoughBookiesException);
+            
             long counter = 0;
             
             cbWorker.addOperation(r);
             Operation.AddOp aOp = (Operation.AddOp) r;
             PendingOp pOp = new PendingOp();
+            ArrayList<BookieHandle> bookies;
+            
             while(counter < lh.getQuorumSize()  ){
                 int index = (int)((aOp.entry + counter++) % n);
                 
@@ -242,20 +281,14 @@
                             pOp, 
                             index,
                             opMonitor);
+                   
                     lh.getBookies().get((index) % n).sendAdd(lh, sAdd, aOp.entry);
-                } catch (IOException io) {
-                    LOG.error(io);
-                    try{
-                        /*
-                         * Before getting a new bookie, try to reconnect
-                         */
-                        lh.getBookies().get((index) % n).restart();
-                    } catch (IOException nio){
-                        lh.removeBookie(index);
-                    }
+                } catch (Exception io) {
+                    LOG.error("Error when sending entry: " + aOp.entry + ", " + index + ", " + io);
+                    counter--;
+                    n = lh.getBookies().size();
                 }
             }
-            //qRef = (qRef + 1) % n;
             break;
                 case Operation.STOP:
                     cbWorker.shutdown();

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Wed Jun 24 05:07:23 2009
@@ -55,21 +55,12 @@
  * 
  */
 public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
-    Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
+    static Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
     
     LedgerHandle lh;
     
     static final int MAXRETRIES = 2;
-    static HashMap<Long, QuorumOpMonitor> instances = 
-        new HashMap<Long, QuorumOpMonitor>();
     
-    public static QuorumOpMonitor getInstance(LedgerHandle lh){
-        if(instances.get(lh.getId()) == null) {
-            instances.put(lh.getId(), new QuorumOpMonitor(lh));
-        }
-        
-        return instances.get(lh.getId());
-    }
     
     /**
      * Message disgest instance
@@ -160,17 +151,22 @@
         if(rc == 0){
             // Everything went ok with this op
             synchronized(pOp){ 
-                //pOp.bookieIdSent.add(sId);
                 pOp.bookieIdRecv.add(sId);
-                if(pOp.bookieIdRecv.size() == lh.getQuorumSize()){
-                    //pendingAdds.remove(entryId);
-                    //sAdd.op.cb.addComplete(sAdd.op.getErrorCode(),
-                    //        ledgerId, entryId, sAdd.op.ctx);
+                lh.setLastRecvCorrectly(sId, entryId);
+                if(pOp.bookieIdRecv.size() >= lh.getQuorumSize()){
                     sAdd.op.setReady();     
                 }
             }
         } else {
-            LOG.error("Error sending write request: " + rc + " : " + ledgerId);
+            //LOG.warn("Error sending write request: " + rc + " : " + ledgerId + ": " + lh.getBookies().size());
+            /*
+             * If ledger is closed already, then simply return
+             */
+            if(lh.getId() == -1){
+                LOG.warn("Ledger identifier is not valid");
+                return;
+            }
+            
             HashSet<Integer> ids;
               
             synchronized(pOp){
@@ -180,8 +176,7 @@
                 if(ids.size() == lh.getBookies().size()){
                     if(pOp.retries++ >= MAXRETRIES){
                         //Call back with error code
-                        //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
-                        //        ledgerId, entryId, sAdd.op.ctx);
+  
                         sAdd.op.setErrorCode(BKDefs.ENR);
                         sAdd.op.setReady();
                         return;
@@ -190,25 +185,38 @@
                     ids.clear();
                 }
                 // Select another bookie that we haven't contacted yet
-                for(int i = 0; i < lh.getBookies().size(); i++){
-                    if(!ids.contains(Integer.valueOf(i))){
-                        // and send it to new bookie
-                        try{
-                            list.get(i).sendAdd(lh, new SubAddOp(sAdd.op, 
-                                    pOp, 
-                                    i, 
-                                    this), ((AddOp) sAdd.op).entry);
-                            pOp.bookieIdRecv.add(sId.intValue());
-                                
-                            break;
-                        } catch(IOException e){
-                            LOG.error(e);
-                        }
+                try{
+                    //LOG.info("Selecting another bookie " + entryId);
+                    int bCounter;
+                    if(sId >= (entryId % (lh.getBookies().size() + 1))){
+                        bCounter = sId - (((int) entryId) % (lh.getBookies().size() + 1));
+                    } else {
+                        bCounter = (lh.getBookies().size() + 1) - (((int) entryId) % (lh.getBookies().size() + 1)) - sId;
                     }
-                }       
+                    
+                    int tmpId = (((int) entryId) + lh.getQuorumSize()) % (lh.getBookies().size() + 1);
+                    int newId = tmpId % lh.getBookies().size();
+                    //LOG.info("Sending a new add operation to bookie: " + newId + ", " + lh.getBookies().get(newId).addr);
+                    
+                    BookieHandle bh = lh.getBookies().get(newId);
+                    
+                    //LOG.info("Got handle for " + newId);
+                    
+                    bh.sendAdd(lh, new SubAddOp(sAdd.op, 
+                            pOp, 
+                            newId, 
+                            this), entryId);
+               
+                    //LOG.info("Ended " + entryId + ", " + newId);
+                } catch(IOException e){
+                    LOG.error(e);
+                } catch(BKException e){
+                    LOG.error(e);
+                }
             }
-        }
+        }       
     }
+
     
     /**
      * Callback method for read operations. There is one callback for
@@ -256,6 +264,7 @@
                                     byte[] data = new byte[voted.capacity() - dLength - 24];
                                     voted.position(24);                                    
                                     voted.get(data, 0, data.length);
+                                    //LOG.warn("Data length (" + entryId + "): " + data.length);
                                     counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
                                 } 
                             }
@@ -338,6 +347,7 @@
         
         if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
             int counter = -1;
+            //LOG.warn("Giving up on " + entryId + "(" + lh.getThreshold() + ")");
             counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
             
             if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
@@ -450,6 +460,8 @@
     private int addNewEntry(LedgerEntry le, ReadOp op){
         long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
         if(op.seq[(int) index] == null){
+            if(le.getEntry() == null) LOG.warn("Ledger entry is null (" + le.getEntryId() + ")");
+            //if(le.getEntryId() % 100 == 0) LOG.info("New entry: " + le.getEntryId() + ")");
             op.seq[(int) index] = le;
             
             return op.counter.incrementAndGet();

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Wed Jun 24 05:07:23 2009
@@ -28,12 +28,16 @@
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Enumeration;
 import java.security.NoSuchAlgorithmException;
 import java.security.InvalidKeyException;
 import java.security.MessageDigest;
 import javax.crypto.Mac; 
 import javax.crypto.spec.SecretKeySpec;
 
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
+import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.proto.ReadEntryCallback;
 import org.apache.bookkeeper.proto.WriteCallback;
 import org.apache.log4j.Logger;
@@ -50,13 +54,8 @@
     int myCounter = 0;
 
     public BookieClient(InetSocketAddress addr, int recvTimeout)
-    throws IOException, ConnectException {
-        sock = SocketChannel.open(addr);
-        setDaemon(true);
-   
-        sock.socket().setSoTimeout(recvTimeout);
-        sock.socket().setTcpNoDelay(true);
-        start();
+    throws IOException, ConnectException { 
+        startConnection(addr, recvTimeout);
     }
     
     public BookieClient(String host, int port, int recvTimeout)
@@ -64,6 +63,16 @@
         this(new InetSocketAddress(host, port), recvTimeout);
     }
     
+    public void startConnection(InetSocketAddress addr, int recvTimeout)
+    throws IOException, ConnectException {
+        sock = SocketChannel.open(addr);
+        setDaemon(true);
+        //sock.configureBlocking(false);
+        sock.socket().setSoTimeout(recvTimeout);
+        sock.socket().setTcpNoDelay(true);
+        start();        
+    }
+    
     private static class Completion<T> {
         Completion(T cb, Object ctx) {
             this.cb = cb;
@@ -105,13 +114,12 @@
     ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
         new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
     
-    
     /*
      * Use this semaphore to control the number of completion key in both addCompletions
      * and readCompletions. This is more of a problem for readCompletions because one
      * readEntries opertion is expanded into individual operations to read entries.
      */
-    Semaphore completionSemaphore = new Semaphore(1000);
+    Semaphore completionSemaphore = new Semaphore(3000);
     
    
     /**
@@ -150,7 +158,10 @@
     }
     
     /**
-     * Send addEntry operation to bookie.
+     * Send addEntry operation to bookie. It throws an IOException
+     * if either the write to the socket fails or it takes too long
+     * to obtain a permit to send another request, which possibly 
+     * implies that the corresponding bookie is down.
      * 
      * @param ledgerId	ledger identifier
      * @param entryId 	entry identifier
@@ -163,39 +174,37 @@
             ByteBuffer entry, WriteCallback cb, Object ctx) 
     throws IOException, InterruptedException {
         
-        //LOG.info("Data length: " + entry.capacity());
-    	completionSemaphore.acquire();
+        if(cb == null)
+            LOG.error("WriteCallback object is null: " + entryId);
         addCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<WriteCallback>(cb, ctx));
-        //entry = entry.duplicate();
-        //entry.position(0);
-        
+
         ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
 
         tmpEntry.position(4);
         tmpEntry.putInt(BookieProtocol.ADDENTRY);
         tmpEntry.put(masterKey);
-        //LOG.debug("Master key: " + new String(masterKey));
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.put(entry);
         tmpEntry.position(0);
         
-        //ByteBuffer len = ByteBuffer.allocate(4);
         // 4 bytes for the message type
         tmpEntry.putInt(tmpEntry.remaining() - 4);
         tmpEntry.position(0);
-        //sock.write(len);
-        //len.clear();
-        //len.putInt(BookieProtocol.ADDENTRY);
-        //len.flip();
-        //sock.write(len);
-        sock.write(tmpEntry);
-        //LOG.debug("addEntry:finished");
+
+        
+        if(!sock.isConnected() || 
+                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
+            throw new IOException();
+        } else sock.write(tmpEntry);
     }
     
     /**
-     * Send readEntry operation to bookie.
+     * Send readEntry operation to bookie. It throws an IOException
+     * if either the write to the socket fails or it takes too long
+     * to obtain a permit to send another request, which possibly 
+     * implies that the corresponding bookie is down.
      * 
      * @param ledgerId	ledger identifier
      * @param entryId	entry identifier
@@ -206,28 +215,22 @@
     synchronized public void readEntry(long ledgerId, long entryId,
             ReadEntryCallback cb, Object ctx) 
     throws IOException, InterruptedException {
-    	
-    	completionSemaphore.acquire();
+        //LOG.info("Entry id: " + entryId);
+    	//completionSemaphore.acquire();
         readCompletions.put(new CompletionKey(ledgerId, entryId),
                 new Completion<ReadEntryCallback>(cb, ctx));
+        
         ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
         tmpEntry.putInt(20);
         tmpEntry.putInt(BookieProtocol.READENTRY);
         tmpEntry.putLong(ledgerId);
         tmpEntry.putLong(entryId);
         tmpEntry.position(0);
-        
-        //ByteBuffer len = ByteBuffer.allocate(4);
-        //len.putInt(tmpEntry.remaining() + 4);
-        //len.flip();
-        //LOG.debug("readEntry: Writing to socket");
-        //sock.write(len);
-        //len.clear();
-        //len.putInt(BookieProtocol.READENTRY);
-        //len.flip();
-        //sock.write(len);
-        sock.write(tmpEntry);
-        //LOG.error("Size of readCompletions: " + readCompletions.size());
+
+        if(!sock.isConnected() || 
+                !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){ 
+            throw new IOException();
+        } else sock.write(tmpEntry);
     }
     
     private void readFully(ByteBuffer bb) throws IOException {
@@ -236,6 +239,7 @@
         }
     }
     
+    Semaphore running = new Semaphore(0);
     public void run() {
         int len = -1;
         ByteBuffer lenBuffer = ByteBuffer.allocate(4);
@@ -254,47 +258,44 @@
  
                 switch(type) {
                 case BookieProtocol.ADDENTRY:
-                {
+                {                    
                     long ledgerId = bb.getLong();
                     long entryId = bb.getLong();
-                    Completion<WriteCallback> ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+
+                    Completion<WriteCallback> ac;
+                    ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
                     completionSemaphore.release();
-                    
                     if (ac != null) {
                         ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
                     } else {
                         LOG.error("Callback object null: " + ledgerId + " : " + entryId);
                     }
+
                     break;
                 }
                 case BookieProtocol.READENTRY:
                 {
-                    //ByteBuffer entryData = bb.slice();
                     long ledgerId = bb.getLong();
                     long entryId = bb.getLong();
                     
                     bb.position(24);
                     byte[] data = new byte[bb.capacity() - 24];
                     bb.get(data);
-                    ByteBuffer entryData = ByteBuffer.wrap(data);
-                    //ByteBuffer entryData = bb;
-                    //LOG.info("Received entry: " + ledgerId + ", " + entryId
-                    // + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());          
+                    ByteBuffer entryData = ByteBuffer.wrap(data);         
                     
                     CompletionKey key = new CompletionKey(ledgerId, entryId);
                     Completion<ReadEntryCallback> c;
                     
                     if(readCompletions.containsKey(key)){
-                        c = readCompletions.remove(key);
-                        //LOG.error("Found key");
+                            c = readCompletions.remove(key);
                     }
                     else{    
-                        /*
-                         * This is a special case. When recovering a ledger, a client submits
-                         * a read request with id -1, and receives a response with a different
-                         * entry id.
-                         */
-                        c = readCompletions.remove(new CompletionKey(ledgerId, -1));
+                            /*
+                             * This is a special case. When recovering a ledger, a client submits
+                             * a read request with id -1, and receives a response with a different
+                             * entry id.
+                             */
+                            c = readCompletions.remove(new CompletionKey(ledgerId, -1));
                     }
                     completionSemaphore.release();
                     
@@ -311,9 +312,72 @@
                     System.err.println("Got error " + rc + " for type " + type);
                 }
             }
+            
         } catch(Exception e) {
-            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc, e);
+            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
         }
+        running.release();
+        
+    }
+    
+    /**
+     * Errors out pending entries. We call this method from one thread to avoid
+     * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
+     * simpler to call it from BookieHandle instead of calling directly from here.
+     */
+    
+    public void errorOut(){
+        LOG.info("Erroring out pending entries");
+    
+        for (Enumeration<CompletionKey> e = addCompletions.keys() ; e.hasMoreElements() ;) {
+            CompletionKey key = e.nextElement();
+            Completion<WriteCallback> ac = addCompletions.remove(key);
+            if(ac != null){
+                completionSemaphore.release();
+                ac.cb.writeComplete(-1, key.ledgerId, key.entryId, ac.ctx);
+            }
+        }
+        
+        LOG.info("Finished erroring out pending add entries");
+         
+        for (Enumeration<CompletionKey> e = readCompletions.keys() ; e.hasMoreElements() ;) {
+            CompletionKey key = e.nextElement();
+            Completion<ReadEntryCallback> ac = readCompletions.remove(key);
+                
+            if(ac != null){
+                completionSemaphore.release();
+                ac.cb.readEntryComplete(-1, key.ledgerId, key.entryId, null, ac.ctx);
+            }
+        }
+        
+        LOG.info("Finished erroring out pending read entries");
+    }
+
+    /**
+     * Halts client.
+     */
+    
+    public void halt() {
+        try{
+            sock.close();
+        } catch(IOException e) {
+            LOG.warn("Exception while closing socket");
+        }
+        
+        try{
+            running.acquire();
+        } catch(InterruptedException e){
+            LOG.error("Interrupted while waiting for running semaphore to acquire lock");
+        }
+    }
+    
+    /**
+     * Returns the status of the socket of this bookie client.
+     * 
+     * @return boolean
+     */
+    public boolean isConnected(){
+        return sock.isConnected();
     }
 
     private static class Counter {

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Wed Jun 24 05:07:23 2009
@@ -39,6 +39,7 @@
 public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
     int port;
     NIOServerFactory nioServerFactory;
+    volatile boolean down = false;
     Bookie bookie;
     static Logger LOG = Logger.getLogger(BookieServer.class);
     
@@ -50,9 +51,13 @@
         nioServerFactory = new NIOServerFactory(port, this);
     }
     public void shutdown() throws InterruptedException {
+        down = true;
         nioServerFactory.shutdown();
         bookie.shutdown();
     }
+    public boolean isDown(){
+        return down;
+    }
     public void join() throws InterruptedException {
         nioServerFactory.join();
     }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,7 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.log4j.Logger;
 
@@ -87,6 +88,8 @@
             } catch(InterruptedException ie) {
                 LOG.warn("Interrupted while flusing " + ie);
                 Thread.currentThread().interrupt();
+            } catch(BKException bke) {
+                LOG.warn("BookKeeper exception ", bke);
             }
         }
     }
@@ -120,6 +123,8 @@
             } catch(InterruptedException ie) {
                 LOG.warn("Interrupted while writing", ie);
                 Thread.currentThread().interrupt();
+            } catch(BKException bke) {
+                LOG.warn("BookKeeper exception", bke);
             }
         }
     }