You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/21 17:50:27 UTC

bookkeeper git commit: BOOKKEEPER-1089: Ledger Recovery (part-4) - allow batch reads in ledger recovery

Repository: bookkeeper
Updated Branches:
  refs/heads/master 363c5d00b -> 90a8f2839


BOOKKEEPER-1089: Ledger Recovery (part-4) - allow batch reads in ledger recovery

This change is based on #178 - (you can review git sha 82f73ef)

bookkeeper recovery improvement (part-4): allow batch reading in ledger recovery

    - enable batch read in ledger recovery, so we could parallel reading to improve recovery time.

    RB_ID=266145

Author: Sijie Guo <si...@twitter.com>
Author: Sijie Guo <si...@apache.org>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Matteo Merli <mm...@apache.org>

This closes #181 from sijie/recovery_improvements_part4


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/90a8f283
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/90a8f283
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/90a8f283

Branch: refs/heads/master
Commit: 90a8f283999ddbfee818629e1be17102e63be22c
Parents: 363c5d0
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Jun 21 10:50:23 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jun 21 10:50:23 2017 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BookKeeper.java    |   7 +-
 .../client/BookKeeperClientStats.java           |   8 ++
 .../apache/bookkeeper/client/LedgerHandle.java  |  36 +++--
 .../bookkeeper/client/LedgerRecoveryOp.java     | 132 +++++++++++++------
 .../apache/bookkeeper/client/PendingReadOp.java |   2 +-
 .../bookkeeper/conf/ClientConfiguration.java    |  22 ++++
 .../bookkeeper/client/LedgerRecoveryTest.java   |  91 ++++++++++++-
 7 files changed, 234 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 7a76dd4..acc9f0a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -102,7 +102,8 @@ public class BookKeeper implements AutoCloseable {
     private OpStatsLogger addOpLogger;
     private OpStatsLogger writeLacOpLogger;
     private OpStatsLogger readLacOpLogger;
-
+    private OpStatsLogger recoverAddEntriesStats;
+    private OpStatsLogger recoverReadEntriesStats;
 
     // whether the event loop group is one we created, or is owned by whoever
     // instantiated us
@@ -1246,6 +1247,8 @@ public class BookKeeper implements AutoCloseable {
         addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
         writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
         readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
+        recoverAddEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_ADD_ENTRIES);
+        recoverReadEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_READ_ENTRIES);
     }
 
     OpStatsLogger getCreateOpLogger() { return createOpLogger; }
@@ -1255,6 +1258,8 @@ public class BookKeeper implements AutoCloseable {
     OpStatsLogger getAddOpLogger() { return addOpLogger; }
     OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
     OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
+    OpStatsLogger getRecoverAddCountLogger() { return recoverAddEntriesStats; }
+    OpStatsLogger getRecoverReadCountLogger() { return recoverReadEntriesStats; }
 
     static EventLoopGroup getDefaultEventLoopGroup() {
         ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("bookkeeper-io-%s").build();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index dc193a7..6166c95 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -23,9 +23,17 @@ package org.apache.bookkeeper.client;
 
 public interface BookKeeperClientStats {
     public final static String CLIENT_SCOPE = "bookkeeper_client";
+
+    // Metadata Operations
+
     public final static String CREATE_OP = "LEDGER_CREATE";
     public final static String DELETE_OP = "LEDGER_DELETE";
     public final static String OPEN_OP = "LEDGER_OPEN";
+    public final static String LEDGER_RECOVER_READ_ENTRIES = "LEDGER_RECOVER_READ_ENTRIES";
+    public final static String LEDGER_RECOVER_ADD_ENTRIES = "LEDGER_RECOVER_ADD_ENTRIES";
+
+    // Data Operations
+
     public final static String ADD_OP = "ADD_ENTRY";
     public final static String READ_OP = "READ_ENTRY";
     public final static String WRITE_LAC_OP = "WRITE_LAC";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7fa8c61..7a9d81b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -21,9 +21,11 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -39,7 +41,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -57,9 +58,6 @@ import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-
 /**
  * Ledger handle contains ledger metadata and is used to access the read and
  * write operations to a ledger.
@@ -594,12 +592,8 @@ public class LedgerHandle implements AutoCloseable {
     }
 
     void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
-        try {
-            new PendingReadOp(this, bk.scheduler,
-                              firstEntry, lastEntry, cb, ctx).initiate();
-        } catch (InterruptedException e) {
-            cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
-        }
+        new PendingReadOp(this, bk.scheduler,
+                          firstEntry, lastEntry, cb, ctx).initiate();
     }
 
     /**
@@ -1478,7 +1472,9 @@ public class LedgerHandle implements AutoCloseable {
             // if metadata is already in recover, dont try to write again,
             // just do the recovery from the starting point
             new LedgerRecoveryOp(LedgerHandle.this, cb)
-                    .parallelRead(bk.getConf().getEnableParallelRecoveryRead()).initiate();
+                    .parallelRead(bk.getConf().getEnableParallelRecoveryRead())
+                    .readBatchSize(bk.getConf().getRecoveryReadBatchSize())
+                    .initiate();
             return;
         }
 
@@ -1505,7 +1501,9 @@ public class LedgerHandle implements AutoCloseable {
                     });
                 } else if (rc == BKException.Code.OK) {
                     new LedgerRecoveryOp(LedgerHandle.this, cb)
-                            .parallelRead(bk.getConf().getEnableParallelRecoveryRead()).initiate();
+                            .parallelRead(bk.getConf().getEnableParallelRecoveryRead())
+                            .readBatchSize(bk.getConf().getRecoveryReadBatchSize())
+                            .initiate();
                 } else {
                     LOG.error("Error writing ledger config " + rc + " of ledger " + ledgerId);
                     cb.operationComplete(rc, null);
@@ -1538,10 +1536,8 @@ public class LedgerHandle implements AutoCloseable {
          *
          * @param rc
          *          return code
-         * @param leder
+         * @param lh
          *          ledger identifier
-         * @param entry
-         *          entry identifier
          * @param ctx
          *          control object
          */
@@ -1563,8 +1559,8 @@ public class LedgerHandle implements AutoCloseable {
          *
          * @param rc
          *          return code
-         * @param leder
-         *          ledger identifier
+         * @param lh
+         *          ledger handle
          * @param seq
          *          sequence of entries
          * @param ctx
@@ -1585,8 +1581,8 @@ public class LedgerHandle implements AutoCloseable {
          *
          * @param rc
          *          return code
-         * @param leder
-         *          ledger identifier
+         * @param lh
+         *          ledger handle
          * @param entry
          *          entry identifier
          * @param ctx

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 666dbe8..cc19dc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,18 +15,18 @@ package org.apache.bookkeeper.client;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.client;
 
-import java.util.Enumeration;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.zookeeper.KeeperException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,23 +38,31 @@ import org.slf4j.LoggerFactory;
  * the ledger at that entry.
  *
  */
-class LedgerRecoveryOp implements ReadCallback, AddCallback {
+class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
+
     static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
-    LedgerHandle lh;
-    AtomicLong readCount, writeCount;
-    AtomicBoolean readDone;
-    AtomicBoolean callbackDone;
-    long entryToRead;
+
+    final LedgerHandle lh;
+    final AtomicLong readCount, writeCount;
+    final AtomicBoolean readDone;
+    final AtomicBoolean callbackDone;
+    volatile long startEntryToRead;
+    volatile long endEntryToRead;
+    final GenericCallback<Void> cb;
     // keep a copy of metadata for recovery.
     LedgerMetadata metadataForRecovery;
     boolean parallelRead = false;
+    int readBatchSize = 1;
 
-    GenericCallback<Void> cb;
+    // EntryListener Hook
+    @VisibleForTesting
+    ReadEntryListener entryListener = null;
 
-    class RecoveryReadOp extends PendingReadOp {
+    class RecoveryReadOp extends ListenerBasedPendingReadOp {
 
-        RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, long startEntryId,
-                long endEntryId, ReadCallback cb, Object ctx) {
+        RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+                       long startEntryId, long endEntryId,
+                       ReadEntryListener cb, Object ctx) {
             super(lh, scheduler, startEntryId, endEntryId, cb, ctx);
         }
 
@@ -81,6 +87,24 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
         return this;
     }
 
+    LedgerRecoveryOp readBatchSize(int batchSize) {
+        this.readBatchSize = batchSize;
+        return this;
+    }
+
+    /**
+     * Set an entry listener to listen on individual recovery reads during recovery procedure.
+     *
+     * @param entryListener
+     *          entry listener
+     * @return ledger recovery operation
+     */
+    @VisibleForTesting
+    LedgerRecoveryOp setEntryListener(ReadEntryListener entryListener) {
+        this.entryListener = entryListener;
+        return this;
+    }
+
     public void initiate() {
         ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@@ -88,15 +112,15 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
                         if (rc == BKException.Code.OK) {
                             lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
                             lh.length = data.length;
-                            entryToRead = lh.lastAddConfirmed;
+                            startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
                             // keep a copy of ledger metadata before proceeding
                             // ledger recovery
                             metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
                             doRecoveryRead();
                         } else if (rc == BKException.Code.UnauthorizedAccessException) {
-                            cb.operationComplete(rc, null);
+                            submitCallback(rc);
                         } else {
-                            cb.operationComplete(BKException.Code.ReadException, null);
+                            submitCallback(BKException.Code.ReadException);
                         }
                     }
                 });
@@ -109,17 +133,26 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
         rlcop.initiateWithFencing();
     }
 
+    private void submitCallback(int rc) {
+        if (BKException.Code.OK == rc) {
+            lh.bk.getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
+            lh.bk.getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
+        } else {
+            lh.bk.getRecoverAddCountLogger().registerFailedValue(writeCount.get());
+            lh.bk.getRecoverReadCountLogger().registerFailedValue(readCount.get());
+        }
+        cb.operationComplete(rc, null);
+    }
+
     /**
      * Try to read past the last confirmed.
      */
     private void doRecoveryRead() {
         if (!callbackDone.get()) {
-            entryToRead++;
-            try {
-                new RecoveryReadOp(lh, lh.bk.scheduler, entryToRead, entryToRead, this, null).parallelRead(parallelRead).initiate();
-            } catch (InterruptedException e) {
-                readComplete(BKException.Code.InterruptedException, lh, null, null);
-            }
+            startEntryToRead = endEntryToRead + 1;
+            endEntryToRead = endEntryToRead + readBatchSize;
+            new RecoveryReadOp(lh, lh.bk.scheduler, startEntryToRead, endEntryToRead, this, null)
+                    .parallelRead(parallelRead).initiate();
         }
     }
 
@@ -131,9 +164,9 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
                     if (rc != BKException.Code.OK) {
                         LOG.warn("Close ledger {} failed during recovery: ",
                             LedgerRecoveryOp.this.lh.getId(), BKException.getMessage(rc));
-                        cb.operationComplete(rc, null);
+                        submitCallback(rc);
                     } else {
-                        cb.operationComplete(BKException.Code.OK, null);
+                        submitCallback(BKException.Code.OK);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("After closing length is: {}", lh.getLength());
                         }
@@ -144,10 +177,16 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
     }
 
     @Override
-    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-        if (rc == BKException.Code.OK) {
+    public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+        // notify entry listener on individual entries being read during ledger recovery.
+        ReadEntryListener listener = entryListener;
+        if (null != listener) {
+            listener.onEntryComplete(rc, lh, entry, ctx);
+        }
+
+        // we only trigger recovery add an entry when readDone == false && callbackDone == false
+        if (!callbackDone.get() && !readDone.get() && rc == BKException.Code.OK) {
             readCount.incrementAndGet();
-            LedgerEntry entry = seq.nextElement();
             byte[] data = entry.getEntry();
 
             /*
@@ -157,12 +196,24 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
              */
             synchronized (lh) {
                 lh.length = entry.getLength() - (long) data.length;
+                // check whether entry id is expected, so we won't overwritten any entries by mistake
+                if (entry.getEntryId() != lh.lastAddPushed + 1) {
+                    LOG.error("Unexpected to recovery add entry {} as entry {} for ledger {}.",
+                              new Object[] { entry.getEntryId(), (lh.lastAddPushed + 1), lh.getId() });
+                    rc = BKException.Code.UnexpectedConditionException;
+                }
+            }
+            if (BKException.Code.OK == rc) {
+                lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
+                if (entry.getEntryId() == endEntryToRead) {
+                    // trigger next batch read
+                    doRecoveryRead();
+                }
+                return;
             }
-            lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
-            doRecoveryRead();
-            return;
         }
 
+        // no entry found. stop recovery procedure but wait until recovery add finished.
         if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
             readDone.set(true);
             if (readCount.get() == writeCount.get()) {
@@ -172,20 +223,27 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
         }
 
         // otherwise, some other error, we can't handle
-        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + entryToRead
-                  + " ledger: " + lh.ledgerId + " while recovering ledger");
-        cb.operationComplete(rc, null);
+        if (BKException.Code.OK != rc && callbackDone.compareAndSet(false, true)) {
+            LOG.error("Failure {} while reading entries: ({} - {}), ledger: {} while recovering ledger",
+                      new Object[] { BKException.getMessage(rc), startEntryToRead, endEntryToRead, lh.getId() });
+            submitCallback(rc);
+        } else if (BKException.Code.OK == rc) {
+            // we are here is because we successfully read an entry but readDone was already set to true.
+            // this would happen on recovery a ledger than has gaps in the tail.
+            LOG.warn("Successfully read entry {} for ledger {}, but readDone is already {}",
+                     new Object[] { entry.getEntryId(), lh.getId(), readDone.get() });
+        }
         return;
     }
 
     @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         if (rc != BKException.Code.OK) {
-            LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
+            LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (entryId + 1)
                       + " ledger: " + lh.ledgerId + " while recovering ledger");
             if (callbackDone.compareAndSet(false, true)) {
                 // Give up, we can't recover from this error
-                cb.operationComplete(rc, null);
+                submitCallback(rc);
             }
             return;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a13eb21..0192296 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -428,7 +428,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         return this;
     }
 
-    public void initiate() throws InterruptedException {
+    public void initiate() {
         long nextEnsembleChange = startEntryId, i = startEntryId;
         this.requestTimeNanos = MathUtils.nowInNano();
         ArrayList<BookieSocketAddress> ensemble = null;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 5d4bf03..311fb82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -63,6 +63,7 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String READ_TIMEOUT = "readTimeout";
     protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
     protected final static String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead";
+    protected final static String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
     // Timeout Setting
     protected final static String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
     protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
@@ -815,6 +816,27 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get Recovery Read Batch Size.
+     *
+     * @return recovery read batch size.
+     */
+    public int getRecoveryReadBatchSize() {
+        return getInt(RECOVERY_READ_BATCH_SIZE, 1);
+    }
+
+    /**
+     * Set Recovery Read Batch Size.
+     *
+     * @param batchSize
+     *          recovery read batch size.
+     * @return client configuration.
+     */
+    public ClientConfiguration setRecoveryReadBatchSize(int batchSize) {
+        setProperty(RECOVERY_READ_BATCH_SIZE, batchSize);
+        return this;
+    }
+
+    /**
      * Get Ensemble Placement Policy Class.
      *
      * @return ensemble placement policy class.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 5f7c56f..2b39eaf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,21 +18,26 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BaseTestCase;
 import org.junit.Test;
@@ -45,9 +48,7 @@ import static org.junit.Assert.*;
 
 /**
  * This unit test tests ledger recovery.
- *
  */
-
 public class LedgerRecoveryTest extends BaseTestCase {
     private final static Logger LOG = LoggerFactory.getLogger(LedgerRecoveryTest.class);
 
@@ -421,4 +422,84 @@ public class LedgerRecoveryTest extends BaseTestCase {
         bsConfs.add(conf);
         bs.add(startBookie(conf, rBookie));
     }
+
+    @Test(timeout = 60000)
+    public void testBatchRecoverySize3() throws Exception {
+        batchRecovery(3);
+    }
+
+    @Test(timeout = 60000)
+    public void testBatchRecoverySize13() throws Exception {
+        batchRecovery(13);
+    }
+
+    private void batchRecovery(int batchSize) throws Exception {
+        ClientConfiguration newConf = new ClientConfiguration()
+            .setReadEntryTimeout(60000)
+            .setAddEntryTimeout(60000)
+            .setRecoveryReadBatchSize(batchSize);
+
+        newConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper newBk = new BookKeeper(newConf);
+
+        LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
+        sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+
+        int numEntries = (numBookies * 3) + 1;
+        final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
+        final CountDownLatch addDone = new CountDownLatch(1);
+        for (int i = 0; i < numEntries; i++) {
+            lh.asyncAddEntry(("" + i).getBytes(), new AddCallback() {
+                @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    if (BKException.Code.OK != rc) {
+                        addDone.countDown();
+                        return;
+                    }
+                    if (numPendingAdds.decrementAndGet() == 0) {
+                        addDone.countDown();
+                    }
+                }
+            }, null);
+        }
+        latch1.countDown();
+        latch2.countDown();
+        addDone.await(10, TimeUnit.SECONDS);
+        assertEquals(0, numPendingAdds.get());
+
+        LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
+        assertEquals(BookieProtocol.INVALID_ENTRY_ID, recoverLh.getLastAddConfirmed());
+
+        final CountDownLatch recoverLatch = new CountDownLatch(1);
+        final AtomicBoolean success = new AtomicBoolean(false);
+        LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh, new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+                success.set(BKException.Code.OK == rc);
+                recoverLatch.countDown();
+            }
+        }).parallelRead(true).readBatchSize(newConf.getRecoveryReadBatchSize());
+        recoveryOp.initiate();
+        recoverLatch.await(10, TimeUnit.SECONDS);
+        assertTrue(success.get());
+        assertEquals(numEntries, recoveryOp.readCount.get());
+        assertEquals(numEntries, recoveryOp.writeCount.get());
+
+        Enumeration<LedgerEntry> enumeration = recoverLh.readEntries(0, numEntries - 1);
+
+        int numReads = 0;
+        while (enumeration.hasMoreElements()) {
+            LedgerEntry entry = enumeration.nextElement();
+            assertEquals((long) numReads, entry.getEntryId());
+            assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
+            ++numReads;
+        }
+        assertEquals(numEntries, numReads);
+
+        newBk.close();
+    }
 }