You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/12 11:59:51 UTC

svn commit: r1383872 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ book...

Author: ivank
Date: Wed Sep 12 09:59:50 2012
New Revision: 1383872

URL: http://svn.apache.org/viewvc?rev=1383872&view=rev
Log:
BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Sep 12 09:59:50 2012
@@ -132,6 +132,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-300: Create Bookie format command (Vinay via sijie)
 
+        BOOKKEEPER-208: Separate write quorum from ack quorum (ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchThroughputLatency.java Wed Sep 12 09:59:50 2012
@@ -78,7 +78,7 @@ public class BenchThroughputLatency impl
         }
     }
 
-    public BenchThroughputLatency(int ensemble, int qSize, byte[] passwd,
+    public BenchThroughputLatency(int ensemble, int writeQuorumSize, int ackQuorumSize, byte[] passwd,
             int numberOfLedgers, int sendLimit, ClientConfiguration conf)
             throws KeeperException, IOException, InterruptedException {
         this.sem = new Semaphore(conf.getThrottleValue());
@@ -91,9 +91,11 @@ public class BenchThroughputLatency impl
             lh = new LedgerHandle[this.numberOfLedgers];
 
             for(int i = 0; i < this.numberOfLedgers; i++) {
-                lh[i] = bk.createLedger(ensemble, qSize, BookKeeper.DigestType.CRC32,
+                lh[i] = bk.createLedger(ensemble, writeQuorumSize,
+                                        ackQuorumSize,
+                                        BookKeeper.DigestType.CRC32,
                                         passwd);
-                LOG.info("Ledger Handle: " + lh[i].getId());
+                LOG.debug("Ledger Handle: " + lh[i].getId());
             }
         } catch (BKException e) {
             e.printStackTrace();
@@ -233,6 +235,7 @@ public class BenchThroughputLatency impl
         options.addOption("entrysize", true, "Entry size (bytes), default 1024");
         options.addOption("ensemble", true, "Ensemble size, default 3");
         options.addOption("quorum", true, "Quorum size, default 2");
+        options.addOption("ackQuorum", true, "Ack quorum size, default is same as quorum");
         options.addOption("throttle", true, "Max outstanding requests, default 10000");
         options.addOption("ledgers", true, "Number of ledgers, default 1");
         options.addOption("zookeeper", true, "Zookeeper ensemble, default \"localhost:2181\"");
@@ -261,6 +264,10 @@ public class BenchThroughputLatency impl
         int ledgers = Integer.valueOf(cmd.getOptionValue("ledgers", "1"));
         int ensemble = Integer.valueOf(cmd.getOptionValue("ensemble", "3"));
         int quorum = Integer.valueOf(cmd.getOptionValue("quorum", "2"));
+        int ackQuorum = quorum;
+        if (cmd.hasOption("ackQuorum")) {
+            ackQuorum = Integer.valueOf(cmd.getOptionValue("ackQuorum"));
+        }
         int throttle = Integer.valueOf(cmd.getOptionValue("throttle", "10000"));
         int sendLimit = Integer.valueOf(cmd.getOptionValue("sendlimit", "20000000"));
 
@@ -313,8 +320,8 @@ public class BenchThroughputLatency impl
 
 
         // Now do the benchmark
-        BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, passwd,
-                                                                  ledgers, sendLimit, conf);
+        BenchThroughputLatency bench = new BenchThroughputLatency(ensemble, quorum, ackQuorum,
+                passwd, ledgers, sendLimit, conf);
         bench.setEntryData(data);
         thread = new Thread(bench);
         ZooKeeper zk = null;
@@ -439,8 +446,8 @@ public class BenchThroughputLatency impl
             }
         }
 
-        BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, passwd,
-                ledgers, 50000, conf);
+        BenchThroughputLatency warmup = new BenchThroughputLatency(bookies, bookies, bookies, passwd,
+                                                                   ledgers, 10000, conf);
         warmup.setEntryData(data);
         Thread thread = new Thread(warmup);
         thread.start();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Wed Sep 12 09:59:50 2012
@@ -256,9 +256,47 @@ public class BookKeeper {
      * authenticate access to a ledger, but also to verify entries in ledgers.
      *
      * @param ensSize
-     *          ensemble size
-     * @param qSize
-     *          quorum size
+     *          number of bookies over which to stripe entries
+     * @param writeQuorumSize
+     *          number of bookies each entry will be written to. each of these bookies
+     *          must acknowledge the entry before the call is completed.
+     * @param digestType
+     *          digest type, either MAC or CRC32
+     * @param passwd
+     *          password
+     * @param cb
+     *          createCallback implementation
+     * @param ctx
+     *          optional control object
+     */
+    public void asyncCreateLedger(final int ensSize,
+                                  final int writeQuorumSize,
+                                  final DigestType digestType,
+                                  final byte[] passwd, final CreateCallback cb, final Object ctx)
+    {
+        asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize, digestType, passwd, cb, ctx);
+    }
+
+    /**
+     * Creates a new ledger asynchronously. Ledgers created with this call have
+     * a separate write quorum and ack quorum size. The write quorum must be larger than
+     * the ack quorum.
+     *
+     * Separating the write and the ack quorum allows the BookKeeper client to continue
+     * writing when a bookie has failed but the failure has not yet been detected. Detecting
+     * a bookie has failed can take a number of seconds, as configured by the read timeout
+     * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
+     * that bookie will be removed from the ensemble.
+     *
+     * The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[],
+     *                                      AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ensSize
+     *          number of bookies over which to stripe entries
+     * @param writeQuorumSize
+     *          number of bookies each entry will be written to
+     * @param ackQuorumSize
+     *          number of bookies which must acknowledge an entry before the call is completed
      * @param digestType
      *          digest type, either MAC or CRC32
      * @param passwd
@@ -268,9 +306,17 @@ public class BookKeeper {
      * @param ctx
      *          optional control object
      */
-    public void asyncCreateLedger(final int ensSize, final int qSize, final DigestType digestType,
+
+    public void asyncCreateLedger(final int ensSize,
+                                  final int writeQuorumSize,
+                                  final int ackQuorumSize,
+                                  final DigestType digestType,
                                   final byte[] passwd, final CreateCallback cb, final Object ctx) {
-        new LedgerCreateOp(BookKeeper.this, ensSize, qSize, digestType, passwd, cb, ctx)
+        if (writeQuorumSize < ackQuorumSize) {
+            throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
+        }
+        new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+                           ackQuorumSize, digestType, passwd, cb, ctx)
             .initiate();
     }
 
@@ -305,14 +351,34 @@ public class BookKeeper {
      * @throws BKException
      */
     public LedgerHandle createLedger(int ensSize, int qSize,
-                                     DigestType digestType, byte passwd[]) 
+                                     DigestType digestType, byte passwd[])
+            throws InterruptedException, BKException {
+        return createLedger(ensSize, qSize, qSize, digestType, passwd);
+    }
+
+    /**
+     * Synchronous call to create ledger. Parameters match those of
+     * {@link #asyncCreateLedger(int, int, int, DigestType, byte[],
+     *                           AsyncCallback.CreateCallback, Object)}
+     *
+     * @param ensSize
+     * @param writeQuorumSize
+     * @param ackQuorumSize
+     * @param digestType
+     * @param passwd
+     * @return a handle to the newly created ledger
+     * @throws InterruptedException
+     * @throws BKException
+     */
+    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize,
+                                     DigestType digestType, byte passwd[])
             throws InterruptedException, BKException {
         SyncCounter counter = new SyncCounter();
         counter.inc();
         /*
          * Calls asynchronous version
          */
-        asyncCreateLedger(ensSize, qSize, digestType, passwd, 
+        asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
                           new SyncCreateCallback(), counter);
 
         /*

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java Wed Sep 12 09:59:50 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.util.List;
 /**
  * This interface determins how entries are distributed among bookies.
  *
@@ -30,21 +31,34 @@ package org.apache.bookkeeper.client;
 interface DistributionSchedule {
 
     /**
-     *
-     * @param entryId
-     * @param replicaIndex
-     * @return index of bookie that should get this replica
+     * return the set of bookie indices to send the message to
      */
-    public int getBookieIndex(long entryId, int replicaIndex);
+    public List<Integer> getWriteSet(long entryId);
 
     /**
-     *
-     * @param entryId
-     * @param bookieIndex
-     * @return -1 if the given bookie index is not a replica for the given
-     *         entryId
+     * An ack set represents the set of bookies from which
+     * a response must be received so that an entry can be
+     * considered to be replicated on a quorum.
      */
-    public int getReplicaIndex(long entryId, int bookieIndex);
+    public interface AckSet {
+        /**
+         * Add a bookie response and check if quorum has been met
+         * @return true if quorum has been met, false otherwise
+         */
+        public boolean addBookieAndCheck(int bookieIndexHeardFrom);
+
+        /**
+         * Invalidate a previous bookie response.
+         * Used for reissuing write requests.
+         */
+        public void removeBookie(int bookie);
+    }
+
+    /**
+     * Returns an ackset object, responses should be checked against this
+     */
+    public AckSet getAckSet();
+
 
     /**
      * Interface to keep track of which bookies in an ensemble, an action

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Wed Sep 12 09:59:50 2012
@@ -222,7 +222,7 @@ public class LedgerChecker {
                 final long entryToRead = curEntryId;
 
                 EntryExistsCallback eecb
-                    = new EntryExistsCallback(lh.getLedgerMetadata().getQuorumSize(),
+                    = new EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
                                               new GenericCallback<Boolean>() {
                                                   public void operationComplete(int rc, Boolean result) {
                                                       if (result) {
@@ -232,8 +232,7 @@ public class LedgerChecker {
                                                   }
                                               });
 
-                for (int i = 0; i < lh.getLedgerMetadata().getQuorumSize(); i++) {
-                    int bi = lh.getDistributionSchedule().getBookieIndex(entryToRead, i);
+                for (int bi : lh.getDistributionSchedule().getWriteSet(entryToRead)) {
                     InetSocketAddress addr = curEnsemble.get(bi);
                     bookieClient.readEntry(addr, lh.getId(),
                                            entryToRead, eecb, null);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Wed Sep 12 09:59:50 2012
@@ -68,9 +68,12 @@ class LedgerCreateOp implements GenericC
      *       optional control object
      */
 
-    LedgerCreateOp(BookKeeper bk, int ensembleSize, int quorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx) {
+    LedgerCreateOp(BookKeeper bk, int ensembleSize,
+                   int writeQuorumSize, int ackQuorumSize,
+                   DigestType digestType,
+                   byte[] passwd, CreateCallback cb, Object ctx) {
         this.bk = bk;
-        this.metadata = new LedgerMetadata(ensembleSize, quorumSize, digestType, passwd);
+        this.metadata = new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, passwd);
         this.digestType = digestType;
         this.passwd = passwd;
         this.cb = cb;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Sep 12 09:59:50 2012
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -74,6 +75,7 @@ public class LedgerHandle {
      */
     final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID;
 
+    final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
 
     LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
@@ -98,7 +100,7 @@ public class LedgerHandle {
         macManager = DigestManager.instantiate(ledgerId, password, digestType);
         this.ledgerKey = MacDigestManager.genDigest("ledger", password);
         distributionSchedule = new RoundRobinDistributionSchedule(
-                metadata.getQuorumSize(), metadata.getEnsembleSize());
+                metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize());
     }
 
     /**
@@ -641,8 +643,9 @@ public class LedgerHandle {
         // Start from the head of the queue and proceed while there are
         // entries that have had all their responses come back
         PendingAddOp pendingAddOp;
-        while ((pendingAddOp = pendingAddOps.peek()) != null) {
-            if (pendingAddOp.numResponsesPending != 0) {
+        while ((pendingAddOp = pendingAddOps.peek()) != null
+               && blockAddCompletions.get() == 0) {
+            if (!pendingAddOp.completed) {
                 return;
             }
             pendingAddOps.remove();
@@ -660,6 +663,7 @@ public class LedgerHandle {
                       + bookieIndex);
         }
         final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>();
+        blockAddCompletions.incrementAndGet();
         final long newEnsembleStartEntry = lastAddConfirmed + 1;
 
         // avoid parallel ensemble changes to same ensemble.
@@ -735,6 +739,8 @@ public class LedgerHandle {
                         handleUnrecoverableErrorDuringAdd(rc);
                         return;
                     }
+                    blockAddCompletions.decrementAndGet();
+
                     // the failed bookie has been replaced
                     unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
                 }
@@ -815,6 +821,7 @@ public class LedgerHandle {
                 }
             } else {
                 // the failed bookie has been replaced
+                blockAddCompletions.decrementAndGet();
                 unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
             }
             return true;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Wed Sep 12 09:59:50 2012
@@ -61,7 +61,8 @@ public class LedgerMetadata {
     private int metadataFormatVersion = 0;
 
     private int ensembleSize;
-    private int quorumSize;
+    private int writeQuorumSize;
+    private int ackQuorumSize;
     private long length;
     private long lastEntryId;
 
@@ -74,10 +75,11 @@ public class LedgerMetadata {
     private LedgerMetadataFormat.DigestType digestType;
     private byte[] password;
 
-    public LedgerMetadata(int ensembleSize, int quorumSize,
+    public LedgerMetadata(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
                           BookKeeper.DigestType digestType, byte[] password) {
         this.ensembleSize = ensembleSize;
-        this.quorumSize = quorumSize;
+        this.writeQuorumSize = writeQuorumSize;
+        this.ackQuorumSize = ackQuorumSize;
 
         /*
          * It is set in PendingReadOp.readEntryComplete, and
@@ -95,7 +97,7 @@ public class LedgerMetadata {
     }
 
     private LedgerMetadata() {
-        this(0, 0, BookKeeper.DigestType.MAC, new byte[] {});
+        this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {});
         this.hasPassword = false;
     }
 
@@ -114,8 +116,12 @@ public class LedgerMetadata {
         return ensembleSize;
     }
 
-    public int getQuorumSize() {
-        return quorumSize;
+    public int getWriteQuorumSize() {
+        return writeQuorumSize;
+    }
+
+    public int getAckQuorumSize() {
+        return ackQuorumSize;
     }
 
     /**
@@ -217,7 +223,8 @@ public class LedgerMetadata {
             return serializeVersion1();
         }
         LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
-        builder.setQuorumSize(quorumSize).setEnsembleSize(ensembleSize).setLength(length)
+        builder.setQuorumSize(writeQuorumSize).setAckQuorumSize(ackQuorumSize)
+            .setEnsembleSize(ensembleSize).setLength(length)
             .setState(state).setLastEntryId(lastEntryId);
 
         if (hasPassword) {
@@ -245,7 +252,7 @@ public class LedgerMetadata {
     private byte[] serializeVersion1() {
         StringBuilder s = new StringBuilder();
         s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
-        s.append(quorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
+        s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
 
         for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : ensembles.entrySet()) {
             s.append(lSplitter).append(entry.getKey());
@@ -321,7 +328,13 @@ public class LedgerMetadata {
         LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
         TextFormat.merge(reader, builder);
         LedgerMetadataFormat data = builder.build();
-        lc.quorumSize = data.getQuorumSize();
+        lc.writeQuorumSize = data.getQuorumSize();
+        if (data.hasAckQuorumSize()) {
+            lc.ackQuorumSize = data.getAckQuorumSize();
+        } else {
+            lc.ackQuorumSize = lc.writeQuorumSize;
+        }
+
         lc.ensembleSize = data.getEnsembleSize();
         lc.length = data.getLength();
         lc.state = data.getState();
@@ -346,7 +359,7 @@ public class LedgerMetadata {
     static LedgerMetadata parseVersion1Config(LedgerMetadata lc,
                                               BufferedReader reader) throws IOException {
         try {
-            lc.quorumSize = new Integer(reader.readLine());
+            lc.writeQuorumSize = lc.ackQuorumSize = new Integer(reader.readLine());
             lc.ensembleSize = new Integer(reader.readLine());
             lc.length = new Long(reader.readLine());
 
@@ -413,7 +426,8 @@ public class LedgerMetadata {
 
         if (metadataFormatVersion != newMeta.metadataFormatVersion ||
             ensembleSize != newMeta.ensembleSize ||
-            quorumSize != newMeta.quorumSize ||
+            writeQuorumSize != newMeta.writeQuorumSize ||
+            ackQuorumSize != newMeta.ackQuorumSize ||
             length != newMeta.length ||
             state != newMeta.state ||
             !digestType.equals(newMeta.digestType) ||

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Wed Sep 12 09:59:50 2012
@@ -42,8 +42,10 @@ class PendingAddOp implements WriteCallb
     AddCallback cb;
     Object ctx;
     long entryId;
-    boolean[] successesSoFar;
-    int numResponsesPending;
+
+    DistributionSchedule.AckSet ackSet;
+    boolean completed = false;
+
     LedgerHandle lh;
     boolean isRecoveryAdd = false;
 
@@ -53,11 +55,10 @@ class PendingAddOp implements WriteCallb
         this.ctx = ctx;
         this.entryId = LedgerHandle.INVALID_ENTRY_ID;
         
-        successesSoFar = new boolean[lh.metadata.getQuorumSize()];
-        numResponsesPending = successesSoFar.length;
+        ackSet = lh.distributionSchedule.getAckSet();
     }
 
-    /** 
+    /**
      * Enable the recovery add flag for this operation.
      * @see LedgerHandle#asyncRecoveryAddEntry
      */
@@ -70,11 +71,11 @@ class PendingAddOp implements WriteCallb
         this.entryId = entryId;
     }
 
-    void sendWriteRequest(int bookieIndex, int arrayIndex) {
+    void sendWriteRequest(int bookieIndex) {
         int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE;
 
         lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
-                this, arrayIndex, flags);
+                this, bookieIndex, flags);
     }
 
     void unsetSuccessAndSendWriteRequest(int bookieIndex) {
@@ -85,15 +86,6 @@ class PendingAddOp implements WriteCallb
             return;
         }
 
-        int replicaIndex = lh.distributionSchedule.getReplicaIndex(entryId, bookieIndex);
-        if (replicaIndex < 0) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Leaving unchanged, ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
-                          + bookieIndex);
-            }
-            return;
-        }
-
         if (LOG.isDebugEnabled()) {
             LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
                       + bookieIndex);
@@ -102,27 +94,22 @@ class PendingAddOp implements WriteCallb
         // if we had already heard a success from this array index, need to
         // increment our number of responses that are pending, since we are
         // going to unset this success
-        if (successesSoFar[replicaIndex]) {
-            successesSoFar[replicaIndex] = false;
-            numResponsesPending++;
-        }
+        ackSet.removeBookie(bookieIndex);
+        completed = false;
 
-        sendWriteRequest(bookieIndex, replicaIndex);
+        sendWriteRequest(bookieIndex);
     }
 
     void initiate(ChannelBuffer toSend) {
         this.toSend = toSend;
-        for (int i = 0; i < successesSoFar.length; i++) {
-            int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, i);
-            sendWriteRequest(bookieIndex, i);
+        for (int bookieIndex : lh.distributionSchedule.getWriteSet(entryId)) {
+            sendWriteRequest(bookieIndex);
         }
     }
 
     @Override
     public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
-
-        Integer replicaIndex = (Integer) ctx;
-        int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, replicaIndex);
+        int bookieIndex = (Integer) ctx;
 
         if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
             // ensemble has already changed, failure of this addr is immaterial
@@ -148,14 +135,12 @@ class PendingAddOp implements WriteCallb
             return;
         }
 
-
-        if (!successesSoFar[replicaIndex]) {
-            successesSoFar[replicaIndex] = true;
-            numResponsesPending--;
+        if (ackSet.addBookieAndCheck(bookieIndex) && !completed) {
+            completed = true;
 
             // do some quick checks to see if some adds may have finished. All
             // this will be checked under locks again
-            if (numResponsesPending == 0 && lh.pendingAddOps.peek() == this) {
+            if (lh.pendingAddOps.peek() == this) {
                 lh.sendAddSuccessCallbacks();
             }
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Wed Sep 12 09:59:50 2012
@@ -94,7 +94,7 @@ class PendingReadOp implements Enumerati
     }
 
     void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
-        if (entry.nextReplicaIndexToReadFrom >= lh.metadata.getQuorumSize()) {
+        if (entry.nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
             // we are done, the read has failed from all replicas, just fail the
             // read
             lh.opCounterSem.release();
@@ -102,7 +102,7 @@ class PendingReadOp implements Enumerati
             return;
         }
 
-        int bookieIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom);
+        int bookieIndex = lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom);
         entry.nextReplicaIndexToReadFrom++;
         lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), lh.ledgerId, entry.entryId, 
                                      this, entry);
@@ -110,7 +110,7 @@ class PendingReadOp implements Enumerati
 
     void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
         ArrayList<InetSocketAddress> ensemble = lh.metadata.getEnsemble(entry.entryId);
-        int bookeIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom - 1);
+        int bookeIndex = lh.distributionSchedule.getWriteSet(entry.entryId).get(entry.nextReplicaIndexToReadFrom - 1);
         LOG.error(errMsg + " while reading entry: " + entry.entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
                   + ensemble.get(bookeIndex));
         sendRead(ensemble, entry, rc);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java Wed Sep 12 09:59:50 2012
@@ -19,6 +19,10 @@ package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.util.MathUtils;
 
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+
 /**
  * A specific {@link DistributionSchedule} that places entries in round-robin
  * fashion. For ensemble size 3, and quorum size 2, Entry 0 goes to bookie 0 and
@@ -27,29 +31,39 @@ import org.apache.bookkeeper.util.MathUt
  *
  */
 class RoundRobinDistributionSchedule implements DistributionSchedule {
-    int quorumSize;
-    int ensembleSize;
+    private int writeQuorumSize;
+    private int ackQuorumSize;
+    private int ensembleSize;
 
 
-    public RoundRobinDistributionSchedule(int quorumSize, int ensembleSize) {
-        this.quorumSize = quorumSize;
+    public RoundRobinDistributionSchedule(int writeQuorumSize, int ackQuorumSize, int ensembleSize) {
+        this.writeQuorumSize = writeQuorumSize;
+        this.ackQuorumSize = ackQuorumSize;
         this.ensembleSize = ensembleSize;
     }
 
     @Override
-    public int getBookieIndex(long entryId, int replicaIndex) {
-        return (int) ((entryId + replicaIndex) % ensembleSize);
+    public List<Integer> getWriteSet(long entryId) {
+        List<Integer> set = new ArrayList<Integer>();
+        for (int i = 0; i < this.writeQuorumSize; i++) {
+            set.add((int)((entryId + i) % ensembleSize));
+        }
+        return set;
     }
 
     @Override
-    public int getReplicaIndex(long entryId, int bookieIndex) {
-        // NOTE: Java's % operator returns the sign of the dividend and is hence
-        // not always positive
-
-        int replicaIndex = MathUtils.signSafeMod(bookieIndex - entryId, ensembleSize);
-
-        return replicaIndex < quorumSize ? replicaIndex : -1;
+    public AckSet getAckSet() {
+        final HashSet<Integer> ackSet = new HashSet<Integer>();
+        return new AckSet() {
+            public boolean addBookieAndCheck(int bookieIndexHeardFrom) {
+                ackSet.add(bookieIndexHeardFrom);
+                return ackSet.size() >= ackQuorumSize;
+            }
 
+            public void removeBookie(int bookie) {
+                ackSet.remove(bookie);
+            }
+        };
     }
 
     private class RRQuorumCoverageSet implements QuorumCoverageSet {
@@ -68,7 +82,7 @@ class RoundRobinDistributionSchedule imp
                 return true;
             }
 
-            for (int i = 0; i < quorumSize; i++) {
+            for (int i = 0; i < ackQuorumSize; i++) {
                 int quorumStartIndex = MathUtils.signSafeMod(bookieIndexHeardFrom - i, ensembleSize);
                 if (!covered[quorumStartIndex]) {
                     covered[quorumStartIndex] = true;
@@ -83,12 +97,13 @@ class RoundRobinDistributionSchedule imp
         }
     }
 
+    @Override
     public QuorumCoverageSet getCoverageSet() {
         return new RRQuorumCoverageSet();
     }
     
     @Override
     public boolean hasEntry(long entryId, int bookieIndex) {
-        return getReplicaIndex(entryId, bookieIndex) != -1;
+        return getWriteSet(entryId).contains(bookieIndex);
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Wed Sep 12 09:59:50 2012
@@ -230,6 +230,8 @@ public class ClientConfiguration extends
      * seconds we wait without hearing a response from a bookie
      * before we consider it failed.
      *
+     * The default is 5 seconds.
+     *
      * @return the current read timeout in seconds
      */
     public int getReadTimeout() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DataFormats.java Wed Sep 12 09:59:50 2012
@@ -48,6 +48,10 @@ public final class DataFormats {
     // optional bytes password = 8;
     boolean hasPassword();
     com.google.protobuf.ByteString getPassword();
+    
+    // optional int32 ackQuorumSize = 9;
+    boolean hasAckQuorumSize();
+    int getAckQuorumSize();
   }
   public static final class LedgerMetadataFormat extends
       com.google.protobuf.GeneratedMessage
@@ -767,6 +771,16 @@ public final class DataFormats {
       return password_;
     }
     
+    // optional int32 ackQuorumSize = 9;
+    public static final int ACKQUORUMSIZE_FIELD_NUMBER = 9;
+    private int ackQuorumSize_;
+    public boolean hasAckQuorumSize() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    public int getAckQuorumSize() {
+      return ackQuorumSize_;
+    }
+    
     private void initFields() {
       quorumSize_ = 0;
       ensembleSize_ = 0;
@@ -776,6 +790,7 @@ public final class DataFormats {
       segment_ = java.util.Collections.emptyList();
       digestType_ = org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType.CRC32;
       password_ = com.google.protobuf.ByteString.EMPTY;
+      ackQuorumSize_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -835,6 +850,9 @@ public final class DataFormats {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeBytes(8, password_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeInt32(9, ackQuorumSize_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -876,6 +894,10 @@ public final class DataFormats {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(8, password_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(9, ackQuorumSize_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1021,6 +1043,8 @@ public final class DataFormats {
         bitField0_ = (bitField0_ & ~0x00000040);
         password_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000080);
+        ackQuorumSize_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
       
@@ -1096,6 +1120,10 @@ public final class DataFormats {
           to_bitField0_ |= 0x00000040;
         }
         result.password_ = password_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.ackQuorumSize_ = ackQuorumSize_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1159,6 +1187,9 @@ public final class DataFormats {
         if (other.hasPassword()) {
           setPassword(other.getPassword());
         }
+        if (other.hasAckQuorumSize()) {
+          setAckQuorumSize(other.getAckQuorumSize());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1265,6 +1296,11 @@ public final class DataFormats {
               password_ = input.readBytes();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000100;
+              ackQuorumSize_ = input.readInt32();
+              break;
+            }
           }
         }
       }
@@ -1613,6 +1649,27 @@ public final class DataFormats {
         return this;
       }
       
+      // optional int32 ackQuorumSize = 9;
+      private int ackQuorumSize_ ;
+      public boolean hasAckQuorumSize() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      public int getAckQuorumSize() {
+        return ackQuorumSize_;
+      }
+      public Builder setAckQuorumSize(int value) {
+        bitField0_ |= 0x00000100;
+        ackQuorumSize_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearAckQuorumSize() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        ackQuorumSize_ = 0;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:LedgerMetadataFormat)
     }
     
@@ -3176,24 +3233,24 @@ public final class DataFormats {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n src/main/proto/DataFormats.proto\"\233\003\n\024L" +
+      "\n src/main/proto/DataFormats.proto\"\262\003\n\024L" +
       "edgerMetadataFormat\022\022\n\nquorumSize\030\001 \002(\005\022" +
       "\024\n\014ensembleSize\030\002 \002(\005\022\016\n\006length\030\003 \002(\003\022\023\n" +
       "\013lastEntryId\030\004 \001(\003\0220\n\005state\030\005 \002(\0162\033.Ledg" +
       "erMetadataFormat.State:\004OPEN\022.\n\007segment\030" +
       "\006 \003(\0132\035.LedgerMetadataFormat.Segment\0224\n\n" +
       "digestType\030\007 \001(\0162 .LedgerMetadataFormat." +
-      "DigestType\022\020\n\010password\030\010 \001(\014\0327\n\007Segment\022" +
-      "\026\n\016ensembleMember\030\001 \003(\t\022\024\n\014firstEntryId\030" +
-      "\002 \002(\003\".\n\005State\022\010\n\004OPEN\020\001\022\017\n\013IN_RECOVERY\020",
-      "\002\022\n\n\006CLOSED\020\003\"!\n\nDigestType\022\t\n\005CRC32\020\001\022\010" +
-      "\n\004HMAC\020\002\"@\n\037LedgerRereplicationLayoutFor" +
-      "mat\022\014\n\004type\030\001 \002(\t\022\017\n\007version\030\002 \002(\005\".\n\033Un" +
-      "derreplicatedLedgerFormat\022\017\n\007replica\030\001 \003" +
-      "(\t\"^\n\014CookieFormat\022\022\n\nbookieHost\030\001 \002(\t\022\022" +
-      "\n\njournalDir\030\002 \002(\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022" +
-      "\n\ninstanceId\030\004 \001(\tB\037\n\033org.apache.bookkee" +
-      "per.protoH\001"
+      "DigestType\022\020\n\010password\030\010 \001(\014\022\025\n\rackQuoru" +
+      "mSize\030\t \001(\005\0327\n\007Segment\022\026\n\016ensembleMember" +
+      "\030\001 \003(\t\022\024\n\014firstEntryId\030\002 \002(\003\".\n\005State\022\010\n",
+      "\004OPEN\020\001\022\017\n\013IN_RECOVERY\020\002\022\n\n\006CLOSED\020\003\"!\n\n" +
+      "DigestType\022\t\n\005CRC32\020\001\022\010\n\004HMAC\020\002\"@\n\037Ledge" +
+      "rRereplicationLayoutFormat\022\014\n\004type\030\001 \002(\t" +
+      "\022\017\n\007version\030\002 \002(\005\".\n\033UnderreplicatedLedg" +
+      "erFormat\022\017\n\007replica\030\001 \003(\t\"^\n\014CookieForma" +
+      "t\022\022\n\nbookieHost\030\001 \002(\t\022\022\n\njournalDir\030\002 \002(" +
+      "\t\022\022\n\nledgerDirs\030\003 \002(\t\022\022\n\ninstanceId\030\004 \001(" +
+      "\tB\037\n\033org.apache.bookkeeper.protoH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3205,7 +3262,7 @@ public final class DataFormats {
           internal_static_LedgerMetadataFormat_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_LedgerMetadataFormat_descriptor,
-              new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", },
+              new java.lang.String[] { "QuorumSize", "EnsembleSize", "Length", "LastEntryId", "State", "Segment", "DigestType", "Password", "AckQuorumSize", },
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.class,
               org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.Builder.class);
           internal_static_LedgerMetadataFormat_Segment_descriptor =

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/DataFormats.proto Wed Sep 12 09:59:50 2012
@@ -46,6 +46,8 @@ message LedgerMetadataFormat {
     }
     optional DigestType digestType = 7;
     optional bytes password = 8;
+
+    optional int32 ackQuorumSize = 9;
 }
 
 message LedgerRereplicationLayoutFormat {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Wed Sep 12 09:59:50 2012
@@ -464,7 +464,7 @@ public class BookieRecoveryTest extends 
         ranges.put(keyList.get(keyList.size()-1), untilEntry);
 
         for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
-            int quorum = md.getQuorumSize();
+            int quorum = md.getAckQuorumSize();
             long startEntryId = e.getKey();
             long endEntryId = ranges.get(startEntryId);
             long expectedSuccess = quorum*(endEntryId-startEntryId);

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java?rev=1383872&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java Wed Sep 12 09:59:50 2012
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.List;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import junit.framework.TestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RoundRobinDistributionScheduleTest {
+    static Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionScheduleTest.class);
+
+    @Test
+    public void testDistributionSchedule() throws Exception {
+        RoundRobinDistributionSchedule schedule = new RoundRobinDistributionSchedule(3, 2, 5);
+
+        List<Integer> wSet = schedule.getWriteSet(1);
+        assertEquals("Write set is wrong size", wSet.size(), 3);
+
+        DistributionSchedule.AckSet ackSet = schedule.getAckSet();
+        assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
+        assertFalse("Shouldn't ack yet", ackSet.addBookieAndCheck(wSet.get(0)));
+        assertTrue("Should ack after 2 unique", ackSet.addBookieAndCheck(wSet.get(2)));
+        assertTrue("Should still be acking", ackSet.addBookieAndCheck(wSet.get(1)));
+
+        DistributionSchedule.QuorumCoverageSet covSet = schedule.getCoverageSet();
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
+        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
+
+        covSet = schedule.getCoverageSet();
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(1));
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(2));
+        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(3));
+
+        covSet = schedule.getCoverageSet();
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(4));
+        assertFalse("Shouldn't cover yet", covSet.addBookieAndCheckCovered(0));
+        assertTrue("Should cover now", covSet.addBookieAndCheckCovered(2));
+    }
+}
\ No newline at end of file

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java?rev=1383872&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/SlowBookieTest.java Wed Sep 12 09:59:50 2012
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.Set;
+import java.util.List;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.net.InetSocketAddress;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import junit.framework.TestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+
+public class SlowBookieTest extends BookKeeperClusterTestCase {
+    static Logger LOG = LoggerFactory.getLogger(SlowBookieTest.class);
+
+    public SlowBookieTest() {
+        super(4);
+    }
+
+    @Test
+    public void testSlowBookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(360);
+
+        BookKeeper bkc = new BookKeeper(conf);
+
+        LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, new byte[] {});
+
+        byte[] entry = "Test Entry".getBytes();
+        for (int i = 0; i < 10; i++) {
+            lh.addEntry(entry);
+        }
+        final CountDownLatch b0latch = new CountDownLatch(1);
+        final CountDownLatch b1latch = new CountDownLatch(1);
+        List<InetSocketAddress> curEns = lh.getLedgerMetadata().currentEnsemble;
+        try {
+            sleepBookie(curEns.get(0), b0latch);
+            for (int i = 0; i < 10; i++) {
+                lh.addEntry(entry);
+            }
+            sleepBookie(curEns.get(2), b1latch); // should cover all quorums
+
+            final AtomicInteger i = new AtomicInteger(0xdeadbeef);
+            AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
+                    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                        i.set(rc);
+                    }
+                };
+            lh.asyncAddEntry(entry, cb, null);
+
+            Thread.sleep(1000); // sleep a second to allow time to complete
+            assertEquals(i.get(), 0xdeadbeef);
+            b0latch.countDown();
+            b1latch.countDown();
+            Thread.sleep(2000);
+            assertEquals(i.get(), BKException.Code.OK);
+        } finally {
+            b0latch.countDown();
+            b1latch.countDown();
+        }
+    }
+
+    @Test
+    public void testBookieFailureWithSlowBookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(5);
+
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[] {};
+        final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, pwd);
+        long lid = lh.getId();
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        final AtomicBoolean failTest = new AtomicBoolean(false);
+        final byte[] entry = "Test Entry".getBytes();
+        Thread t = new Thread() {
+                public void run() {
+                    try {
+                        while (!finished.get()) {
+                            lh.addEntry(entry);
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Exception in add entry thread", e);
+                        failTest.set(true);
+                    }
+                }
+            };
+        t.start();
+        final CountDownLatch b0latch = new CountDownLatch(1);
+        startNewBookie();
+        sleepBookie(getBookie(0), b0latch);
+        Thread.sleep(10000);
+        b0latch.countDown();
+        finished.set(true);
+        t.join();
+
+        assertFalse(failTest.get());
+
+        lh.close();
+
+        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+        LedgerChecker lc = new LedgerChecker(bkc);
+        final CountDownLatch checklatch = new CountDownLatch(1);
+        final AtomicInteger numFragments = new AtomicInteger(-1);
+        lc.checkLedger(lh2, new GenericCallback<Set<LedgerFragment>>() {
+                public void operationComplete(int rc, Set<LedgerFragment> fragments) {
+                    LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+                    if (rc == BKException.Code.OK) {
+                        numFragments.set(fragments.size());
+                    }
+                    checklatch.countDown();
+                }
+            });
+        checklatch.await();
+        assertEquals("There should be no missing fragments", 0, numFragments.get());
+    }
+
+    @Test
+    public void testManyBookieFailureWithSlowBookies() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setReadTimeout(5);
+
+        BookKeeper bkc = new BookKeeper(conf);
+
+        byte[] pwd = new byte[] {};
+        final LedgerHandle lh = bkc.createLedger(4, 3, 1, BookKeeper.DigestType.CRC32, pwd);
+        long lid = lh.getId();
+        final AtomicBoolean finished = new AtomicBoolean(false);
+        final AtomicBoolean failTest = new AtomicBoolean(false);
+        final byte[] entry = "Test Entry".getBytes();
+        Thread t = new Thread() {
+                public void run() {
+                    try {
+                        while (!finished.get()) {
+                            lh.addEntry(entry);
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Exception in add entry thread", e);
+                        failTest.set(true);
+                    }
+                }
+            };
+        t.start();
+        final CountDownLatch b0latch = new CountDownLatch(1);
+        final CountDownLatch b1latch = new CountDownLatch(1);
+
+        startNewBookie();
+        startNewBookie();
+
+        sleepBookie(getBookie(0), b0latch);
+        sleepBookie(getBookie(1), b1latch);
+
+        Thread.sleep(10000);
+        b0latch.countDown();
+        b1latch.countDown();
+        finished.set(true);
+        t.join();
+
+        assertFalse(failTest.get());
+
+        lh.close();
+
+        LedgerHandle lh2 = bkc.openLedger(lh.getId(), BookKeeper.DigestType.CRC32, pwd);
+        LedgerChecker lc = new LedgerChecker(bkc);
+        final CountDownLatch checklatch = new CountDownLatch(1);
+        final AtomicInteger numFragments = new AtomicInteger(-1);
+        lc.checkLedger(lh2, new GenericCallback<Set<LedgerFragment>>() {
+                public void operationComplete(int rc, Set<LedgerFragment> fragments) {
+                    LOG.debug("Checked ledgers returned {} {}", rc, fragments);
+                    if (rc == BKException.Code.OK) {
+                        numFragments.set(fragments.size());
+                    }
+                    checklatch.countDown();
+                }
+            });
+        checklatch.await();
+        assertEquals("There should be no missing fragments", 0, numFragments.get());
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java Wed Sep 12 09:59:50 2012
@@ -329,7 +329,7 @@ public class TestLedgerChecker extends B
         ArrayList<InetSocketAddress> firstEnsemble = lh.getLedgerMetadata()
                 .getEnsembles().get(0L);
         InetSocketAddress lastBookieFromEnsemble = firstEnsemble.get(
-                lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 0));
+                lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(0));
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
                 + firstEnsemble);
         killBookie(lastBookieFromEnsemble);
@@ -338,7 +338,7 @@ public class TestLedgerChecker extends B
         lh.addEntry(TEST_LEDGER_ENTRY_DATA);
 
         lastBookieFromEnsemble = firstEnsemble.get(
-                lh.getDistributionSchedule().getBookieIndex(lh.getLastAddPushed(), 1));
+                lh.getDistributionSchedule().getWriteSet(lh.getLastAddPushed()).get(1));
         LOG.info("Killing " + lastBookieFromEnsemble + " from ensemble="
                 + firstEnsemble);
         killBookie(lastBookieFromEnsemble);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java Wed Sep 12 09:59:50 2012
@@ -77,8 +77,7 @@ public class TestReadTimeout extends Boo
         final InetSocketAddress bookieToSleep 
             = writelh.getLedgerMetadata().getEnsemble(numEntries).get(0);
         int sleeptime = baseClientConf.getReadTimeout()*3;
-        CountDownLatch latch = new CountDownLatch(1);
-        sleepBookie(bookieToSleep, sleeptime, latch);
+        CountDownLatch latch = sleepBookie(bookieToSleep, sleeptime);
         latch.await();
 
         writelh.asyncAddEntry(tmp.getBytes(), 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java Wed Sep 12 09:59:50 2012
@@ -56,7 +56,7 @@ public class GcLedgersTest extends Ledge
     private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
         final AtomicInteger expected = new AtomicInteger(numLedgers);
         for (int i=0; i<numLedgers; i++) {
-            getLedgerManager().createLedger(new LedgerMetadata(1, 1, DigestType.MAC, "".getBytes()),
+            getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()),
                 new GenericCallback<Long>() {
                 @Override
                 public void operationComplete(int rc, Long ledgerId) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1383872&r1=1383871&r2=1383872&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Wed Sep 12 09:59:50 2012
@@ -165,6 +165,17 @@ public abstract class BookKeeperClusterT
     }
 
     /**
+     * Get bookie address for bookie at index
+     */
+    public InetSocketAddress getBookie(int index) throws IllegalArgumentException {
+        if (bs.size() <= index || index < 0) {
+            throw new IllegalArgumentException("Invalid index, there are only " + bs.size()
+                                               + " bookies. Asked for " + index);
+        }
+        return bs.get(index).getLocalAddress();
+    }
+
+    /**
      * Kill a bookie by its socket address
      *
      * @param addr
@@ -216,14 +227,13 @@ public abstract class BookKeeperClusterT
      *          Socket Address
      * @param seconds
      *          Sleep seconds
-     * @param l
-     *          Count Down Latch
+     * @return Count Down latch which will be counted down when sleep finishes
      * @throws InterruptedException
      * @throws IOException
      */
-    public void sleepBookie(InetSocketAddress addr, final int seconds,
-                            final CountDownLatch l)
+    public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
             throws InterruptedException, IOException {
+        final CountDownLatch l = new CountDownLatch(1);
         final String name = "BookieJournal-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
@@ -243,6 +253,41 @@ public abstract class BookKeeperClusterT
                     }
                 };
                 sleeper.start();
+                return l;
+            }
+        }
+        throw new IOException("Bookie thread not found");
+    }
+
+    /**
+     * Sleep a bookie until I count down the latch
+     *
+     * @param addr
+     *          Socket Address
+     * @param latch
+     *          Latch to wait on
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
+            throws InterruptedException, IOException {
+        final String name = "BookieJournal-" + addr.getPort();
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().equals(name)) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.await();
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
                 return;
             }
         }