You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/21 17:50:27 UTC
bookkeeper git commit: BOOKKEEPER-1089: Ledger Recovery (part-4) -
allow batch reads in ledger recovery
Repository: bookkeeper
Updated Branches:
refs/heads/master 363c5d00b -> 90a8f2839
BOOKKEEPER-1089: Ledger Recovery (part-4) - allow batch reads in ledger recovery
This change is based on #178 - (you can review git sha 82f73ef)
bookkeeper recovery improvement (part-4): allow batch reading in ledger recovery
- enable batch read in ledger recovery, so we could parallel reading to improve recovery time.
RB_ID=266145
Author: Sijie Guo <si...@twitter.com>
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Matteo Merli <mm...@apache.org>
This closes #181 from sijie/recovery_improvements_part4
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/90a8f283
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/90a8f283
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/90a8f283
Branch: refs/heads/master
Commit: 90a8f283999ddbfee818629e1be17102e63be22c
Parents: 363c5d0
Author: Sijie Guo <si...@twitter.com>
Authored: Wed Jun 21 10:50:23 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jun 21 10:50:23 2017 -0700
----------------------------------------------------------------------
.../apache/bookkeeper/client/BookKeeper.java | 7 +-
.../client/BookKeeperClientStats.java | 8 ++
.../apache/bookkeeper/client/LedgerHandle.java | 36 +++--
.../bookkeeper/client/LedgerRecoveryOp.java | 132 +++++++++++++------
.../apache/bookkeeper/client/PendingReadOp.java | 2 +-
.../bookkeeper/conf/ClientConfiguration.java | 22 ++++
.../bookkeeper/client/LedgerRecoveryTest.java | 91 ++++++++++++-
7 files changed, 234 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 7a76dd4..acc9f0a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -102,7 +102,8 @@ public class BookKeeper implements AutoCloseable {
private OpStatsLogger addOpLogger;
private OpStatsLogger writeLacOpLogger;
private OpStatsLogger readLacOpLogger;
-
+ private OpStatsLogger recoverAddEntriesStats;
+ private OpStatsLogger recoverReadEntriesStats;
// whether the event loop group is one we created, or is owned by whoever
// instantiated us
@@ -1246,6 +1247,8 @@ public class BookKeeper implements AutoCloseable {
addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
+ recoverAddEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_ADD_ENTRIES);
+ recoverReadEntriesStats = stats.getOpStatsLogger(BookKeeperClientStats.LEDGER_RECOVER_READ_ENTRIES);
}
OpStatsLogger getCreateOpLogger() { return createOpLogger; }
@@ -1255,6 +1258,8 @@ public class BookKeeper implements AutoCloseable {
OpStatsLogger getAddOpLogger() { return addOpLogger; }
OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
+ OpStatsLogger getRecoverAddCountLogger() { return recoverAddEntriesStats; }
+ OpStatsLogger getRecoverReadCountLogger() { return recoverReadEntriesStats; }
static EventLoopGroup getDefaultEventLoopGroup() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("bookkeeper-io-%s").build();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index dc193a7..6166c95 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -23,9 +23,17 @@ package org.apache.bookkeeper.client;
public interface BookKeeperClientStats {
public final static String CLIENT_SCOPE = "bookkeeper_client";
+
+ // Metadata Operations
+
public final static String CREATE_OP = "LEDGER_CREATE";
public final static String DELETE_OP = "LEDGER_DELETE";
public final static String OPEN_OP = "LEDGER_OPEN";
+ public final static String LEDGER_RECOVER_READ_ENTRIES = "LEDGER_RECOVER_READ_ENTRIES";
+ public final static String LEDGER_RECOVER_ADD_ENTRIES = "LEDGER_RECOVER_ADD_ENTRIES";
+
+ // Data Operations
+
public final static String ADD_OP = "ADD_ENTRY";
public final static String READ_OP = "READ_ENTRY";
public final static String WRITE_LAC_OP = "WRITE_LAC";
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7fa8c61..7a9d81b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -21,9 +21,11 @@
package org.apache.bookkeeper.client;
import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@@ -39,7 +41,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -57,9 +58,6 @@ import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.RateLimiter;
-
/**
* Ledger handle contains ledger metadata and is used to access the read and
* write operations to a ledger.
@@ -594,12 +592,8 @@ public class LedgerHandle implements AutoCloseable {
}
void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) {
- try {
- new PendingReadOp(this, bk.scheduler,
- firstEntry, lastEntry, cb, ctx).initiate();
- } catch (InterruptedException e) {
- cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
- }
+ new PendingReadOp(this, bk.scheduler,
+ firstEntry, lastEntry, cb, ctx).initiate();
}
/**
@@ -1478,7 +1472,9 @@ public class LedgerHandle implements AutoCloseable {
// if metadata is already in recover, dont try to write again,
// just do the recovery from the starting point
new LedgerRecoveryOp(LedgerHandle.this, cb)
- .parallelRead(bk.getConf().getEnableParallelRecoveryRead()).initiate();
+ .parallelRead(bk.getConf().getEnableParallelRecoveryRead())
+ .readBatchSize(bk.getConf().getRecoveryReadBatchSize())
+ .initiate();
return;
}
@@ -1505,7 +1501,9 @@ public class LedgerHandle implements AutoCloseable {
});
} else if (rc == BKException.Code.OK) {
new LedgerRecoveryOp(LedgerHandle.this, cb)
- .parallelRead(bk.getConf().getEnableParallelRecoveryRead()).initiate();
+ .parallelRead(bk.getConf().getEnableParallelRecoveryRead())
+ .readBatchSize(bk.getConf().getRecoveryReadBatchSize())
+ .initiate();
} else {
LOG.error("Error writing ledger config " + rc + " of ledger " + ledgerId);
cb.operationComplete(rc, null);
@@ -1538,10 +1536,8 @@ public class LedgerHandle implements AutoCloseable {
*
* @param rc
* return code
- * @param leder
+ * @param lh
* ledger identifier
- * @param entry
- * entry identifier
* @param ctx
* control object
*/
@@ -1563,8 +1559,8 @@ public class LedgerHandle implements AutoCloseable {
*
* @param rc
* return code
- * @param leder
- * ledger identifier
+ * @param lh
+ * ledger handle
* @param seq
* sequence of entries
* @param ctx
@@ -1585,8 +1581,8 @@ public class LedgerHandle implements AutoCloseable {
*
* @param rc
* return code
- * @param leder
- * ledger identifier
+ * @param lh
+ * ledger handle
* @param entry
* entry identifier
* @param ctx
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
index 666dbe8..cc19dc9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,18 +15,18 @@ package org.apache.bookkeeper.client;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.client;
-import java.util.Enumeration;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.DigestManager.RecoveryData;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.zookeeper.KeeperException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,23 +38,31 @@ import org.slf4j.LoggerFactory;
* the ledger at that entry.
*
*/
-class LedgerRecoveryOp implements ReadCallback, AddCallback {
+class LedgerRecoveryOp implements ReadEntryListener, AddCallback {
+
static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
- LedgerHandle lh;
- AtomicLong readCount, writeCount;
- AtomicBoolean readDone;
- AtomicBoolean callbackDone;
- long entryToRead;
+
+ final LedgerHandle lh;
+ final AtomicLong readCount, writeCount;
+ final AtomicBoolean readDone;
+ final AtomicBoolean callbackDone;
+ volatile long startEntryToRead;
+ volatile long endEntryToRead;
+ final GenericCallback<Void> cb;
// keep a copy of metadata for recovery.
LedgerMetadata metadataForRecovery;
boolean parallelRead = false;
+ int readBatchSize = 1;
- GenericCallback<Void> cb;
+ // EntryListener Hook
+ @VisibleForTesting
+ ReadEntryListener entryListener = null;
- class RecoveryReadOp extends PendingReadOp {
+ class RecoveryReadOp extends ListenerBasedPendingReadOp {
- RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, long startEntryId,
- long endEntryId, ReadCallback cb, Object ctx) {
+ RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+ long startEntryId, long endEntryId,
+ ReadEntryListener cb, Object ctx) {
super(lh, scheduler, startEntryId, endEntryId, cb, ctx);
}
@@ -81,6 +87,24 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
return this;
}
+ LedgerRecoveryOp readBatchSize(int batchSize) {
+ this.readBatchSize = batchSize;
+ return this;
+ }
+
+ /**
+ * Set an entry listener to listen on individual recovery reads during recovery procedure.
+ *
+ * @param entryListener
+ * entry listener
+ * @return ledger recovery operation
+ */
+ @VisibleForTesting
+ LedgerRecoveryOp setEntryListener(ReadEntryListener entryListener) {
+ this.entryListener = entryListener;
+ return this;
+ }
+
public void initiate() {
ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@@ -88,15 +112,15 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
if (rc == BKException.Code.OK) {
lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
lh.length = data.length;
- entryToRead = lh.lastAddConfirmed;
+ startEntryToRead = endEntryToRead = lh.lastAddConfirmed;
// keep a copy of ledger metadata before proceeding
// ledger recovery
metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
doRecoveryRead();
} else if (rc == BKException.Code.UnauthorizedAccessException) {
- cb.operationComplete(rc, null);
+ submitCallback(rc);
} else {
- cb.operationComplete(BKException.Code.ReadException, null);
+ submitCallback(BKException.Code.ReadException);
}
}
});
@@ -109,17 +133,26 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
rlcop.initiateWithFencing();
}
+ private void submitCallback(int rc) {
+ if (BKException.Code.OK == rc) {
+ lh.bk.getRecoverAddCountLogger().registerSuccessfulValue(writeCount.get());
+ lh.bk.getRecoverReadCountLogger().registerSuccessfulValue(readCount.get());
+ } else {
+ lh.bk.getRecoverAddCountLogger().registerFailedValue(writeCount.get());
+ lh.bk.getRecoverReadCountLogger().registerFailedValue(readCount.get());
+ }
+ cb.operationComplete(rc, null);
+ }
+
/**
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
if (!callbackDone.get()) {
- entryToRead++;
- try {
- new RecoveryReadOp(lh, lh.bk.scheduler, entryToRead, entryToRead, this, null).parallelRead(parallelRead).initiate();
- } catch (InterruptedException e) {
- readComplete(BKException.Code.InterruptedException, lh, null, null);
- }
+ startEntryToRead = endEntryToRead + 1;
+ endEntryToRead = endEntryToRead + readBatchSize;
+ new RecoveryReadOp(lh, lh.bk.scheduler, startEntryToRead, endEntryToRead, this, null)
+ .parallelRead(parallelRead).initiate();
}
}
@@ -131,9 +164,9 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
if (rc != BKException.Code.OK) {
LOG.warn("Close ledger {} failed during recovery: ",
LedgerRecoveryOp.this.lh.getId(), BKException.getMessage(rc));
- cb.operationComplete(rc, null);
+ submitCallback(rc);
} else {
- cb.operationComplete(BKException.Code.OK, null);
+ submitCallback(BKException.Code.OK);
if (LOG.isDebugEnabled()) {
LOG.debug("After closing length is: {}", lh.getLength());
}
@@ -144,10 +177,16 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
}
@Override
- public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
- if (rc == BKException.Code.OK) {
+ public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+ // notify entry listener on individual entries being read during ledger recovery.
+ ReadEntryListener listener = entryListener;
+ if (null != listener) {
+ listener.onEntryComplete(rc, lh, entry, ctx);
+ }
+
+ // we only trigger recovery add an entry when readDone == false && callbackDone == false
+ if (!callbackDone.get() && !readDone.get() && rc == BKException.Code.OK) {
readCount.incrementAndGet();
- LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
/*
@@ -157,12 +196,24 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
*/
synchronized (lh) {
lh.length = entry.getLength() - (long) data.length;
+ // check whether entry id is expected, so we won't overwritten any entries by mistake
+ if (entry.getEntryId() != lh.lastAddPushed + 1) {
+ LOG.error("Unexpected to recovery add entry {} as entry {} for ledger {}.",
+ new Object[] { entry.getEntryId(), (lh.lastAddPushed + 1), lh.getId() });
+ rc = BKException.Code.UnexpectedConditionException;
+ }
+ }
+ if (BKException.Code.OK == rc) {
+ lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
+ if (entry.getEntryId() == endEntryToRead) {
+ // trigger next batch read
+ doRecoveryRead();
+ }
+ return;
}
- lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
- doRecoveryRead();
- return;
}
+ // no entry found. stop recovery procedure but wait until recovery add finished.
if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
readDone.set(true);
if (readCount.get() == writeCount.get()) {
@@ -172,20 +223,27 @@ class LedgerRecoveryOp implements ReadCallback, AddCallback {
}
// otherwise, some other error, we can't handle
- LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + entryToRead
- + " ledger: " + lh.ledgerId + " while recovering ledger");
- cb.operationComplete(rc, null);
+ if (BKException.Code.OK != rc && callbackDone.compareAndSet(false, true)) {
+ LOG.error("Failure {} while reading entries: ({} - {}), ledger: {} while recovering ledger",
+ new Object[] { BKException.getMessage(rc), startEntryToRead, endEntryToRead, lh.getId() });
+ submitCallback(rc);
+ } else if (BKException.Code.OK == rc) {
+ // we are here is because we successfully read an entry but readDone was already set to true.
+ // this would happen on recovery a ledger than has gaps in the tail.
+ LOG.warn("Successfully read entry {} for ledger {}, but readDone is already {}",
+ new Object[] { entry.getEntryId(), lh.getId(), readDone.get() });
+ }
return;
}
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (rc != BKException.Code.OK) {
- LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
+ LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (entryId + 1)
+ " ledger: " + lh.ledgerId + " while recovering ledger");
if (callbackDone.compareAndSet(false, true)) {
// Give up, we can't recover from this error
- cb.operationComplete(rc, null);
+ submitCallback(rc);
}
return;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index a13eb21..0192296 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -428,7 +428,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
return this;
}
- public void initiate() throws InterruptedException {
+ public void initiate() {
long nextEnsembleChange = startEntryId, i = startEntryId;
this.requestTimeNanos = MathUtils.nowInNano();
ArrayList<BookieSocketAddress> ensemble = null;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 5d4bf03..311fb82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -63,6 +63,7 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String READ_TIMEOUT = "readTimeout";
protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
protected final static String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead";
+ protected final static String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize";
// Timeout Setting
protected final static String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
protected final static String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
@@ -815,6 +816,27 @@ public class ClientConfiguration extends AbstractConfiguration {
}
/**
+ * Get Recovery Read Batch Size.
+ *
+ * @return recovery read batch size.
+ */
+ public int getRecoveryReadBatchSize() {
+ return getInt(RECOVERY_READ_BATCH_SIZE, 1);
+ }
+
+ /**
+ * Set Recovery Read Batch Size.
+ *
+ * @param batchSize
+ * recovery read batch size.
+ * @return client configuration.
+ */
+ public ClientConfiguration setRecoveryReadBatchSize(int batchSize) {
+ setProperty(RECOVERY_READ_BATCH_SIZE, batchSize);
+ return this;
+ }
+
+ /**
* Get Ensemble Placement Policy Class.
*
* @return ensemble placement policy class.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/90a8f283/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index 5f7c56f..2b39eaf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,21 +18,26 @@ package org.apache.bookkeeper.client;
* under the License.
*
*/
+package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.test.BaseTestCase;
import org.junit.Test;
@@ -45,9 +48,7 @@ import static org.junit.Assert.*;
/**
* This unit test tests ledger recovery.
- *
*/
-
public class LedgerRecoveryTest extends BaseTestCase {
private final static Logger LOG = LoggerFactory.getLogger(LedgerRecoveryTest.class);
@@ -421,4 +422,84 @@ public class LedgerRecoveryTest extends BaseTestCase {
bsConfs.add(conf);
bs.add(startBookie(conf, rBookie));
}
+
+ @Test(timeout = 60000)
+ public void testBatchRecoverySize3() throws Exception {
+ batchRecovery(3);
+ }
+
+ @Test(timeout = 60000)
+ public void testBatchRecoverySize13() throws Exception {
+ batchRecovery(13);
+ }
+
+ private void batchRecovery(int batchSize) throws Exception {
+ ClientConfiguration newConf = new ClientConfiguration()
+ .setReadEntryTimeout(60000)
+ .setAddEntryTimeout(60000)
+ .setRecoveryReadBatchSize(batchSize);
+
+ newConf.setZkServers(zkUtil.getZooKeeperConnectString());
+ BookKeeper newBk = new BookKeeper(newConf);
+
+ LedgerHandle lh = newBk.createLedger(numBookies, 2, 2, digestType, "".getBytes());
+
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(0), latch1);
+ sleepBookie(lh.getLedgerMetadata().currentEnsemble.get(1), latch2);
+
+ int numEntries = (numBookies * 3) + 1;
+ final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
+ final CountDownLatch addDone = new CountDownLatch(1);
+ for (int i = 0; i < numEntries; i++) {
+ lh.asyncAddEntry(("" + i).getBytes(), new AddCallback() {
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (BKException.Code.OK != rc) {
+ addDone.countDown();
+ return;
+ }
+ if (numPendingAdds.decrementAndGet() == 0) {
+ addDone.countDown();
+ }
+ }
+ }, null);
+ }
+ latch1.countDown();
+ latch2.countDown();
+ addDone.await(10, TimeUnit.SECONDS);
+ assertEquals(0, numPendingAdds.get());
+
+ LedgerHandle recoverLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes());
+ assertEquals(BookieProtocol.INVALID_ENTRY_ID, recoverLh.getLastAddConfirmed());
+
+ final CountDownLatch recoverLatch = new CountDownLatch(1);
+ final AtomicBoolean success = new AtomicBoolean(false);
+ LedgerRecoveryOp recoveryOp = new LedgerRecoveryOp(recoverLh, new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ success.set(BKException.Code.OK == rc);
+ recoverLatch.countDown();
+ }
+ }).parallelRead(true).readBatchSize(newConf.getRecoveryReadBatchSize());
+ recoveryOp.initiate();
+ recoverLatch.await(10, TimeUnit.SECONDS);
+ assertTrue(success.get());
+ assertEquals(numEntries, recoveryOp.readCount.get());
+ assertEquals(numEntries, recoveryOp.writeCount.get());
+
+ Enumeration<LedgerEntry> enumeration = recoverLh.readEntries(0, numEntries - 1);
+
+ int numReads = 0;
+ while (enumeration.hasMoreElements()) {
+ LedgerEntry entry = enumeration.nextElement();
+ assertEquals((long) numReads, entry.getEntryId());
+ assertEquals(numReads, Integer.parseInt(new String(entry.getEntry())));
+ ++numReads;
+ }
+ assertEquals(numEntries, numReads);
+
+ newBk.close();
+ }
}