You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/10/14 17:38:44 UTC
svn commit: r1531944 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
Author: ivank
Date: Mon Oct 14 15:38:44 2013
New Revision: 1531944
URL: http://svn.apache.org/r1531944
Log:
BOOKKEEPER-676: Make add asynchrounous in ledger recovery (aniruddha via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531944&r1=1531943&r2=1531944&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 14 15:38:44 2013
@@ -108,6 +108,8 @@ Trunk (unreleased changes)
BOOKKEEPER-638: Two bookies could start at the same time to access bookie data. (sijie via ivank)
+ BOOKKEEPER-676: Make add asynchrounous in ledger recovery (aniruddha via ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1531944&r1=1531943&r2=1531944&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Mon Oct 14 15:38:44 2013
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.client;
import java.util.Enumeration;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -41,11 +43,10 @@ import org.slf4j.LoggerFactory;
class LedgerRecoveryOp implements ReadCallback, AddCallback {
static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
LedgerHandle lh;
- int numResponsesPending;
- boolean proceedingWithRecovery = false;
- long maxAddPushed = LedgerHandle.INVALID_ENTRY_ID;
- long maxAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
- long maxLength = 0;
+ AtomicLong readCount, writeCount;
+ AtomicBoolean readDone;
+ AtomicBoolean callbackDone;
+ long entryToRead;
// keep a copy of metadata for recovery.
LedgerMetadata metadataForRecovery;
@@ -66,9 +67,12 @@ class LedgerRecoveryOp implements ReadCa
}
public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
+ readCount = new AtomicLong(0);
+ writeCount = new AtomicLong(0);
+ readDone = new AtomicBoolean(false);
+ callbackDone = new AtomicBoolean(false);
this.cb = cb;
this.lh = lh;
- numResponsesPending = lh.metadata.getEnsembleSize();
}
public void initiate() {
@@ -78,6 +82,7 @@ class LedgerRecoveryOp implements ReadCa
if (rc == BKException.Code.OK) {
lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
lh.length = data.length;
+ entryToRead = lh.lastAddConfirmed;
// keep a copy of ledger metadata before proceeding
// ledger recovery
metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
@@ -102,17 +107,38 @@ class LedgerRecoveryOp implements ReadCa
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
- long nextEntry = lh.lastAddConfirmed + 1;
- try {
- new RecoveryReadOp(lh, lh.bk.scheduler, nextEntry, nextEntry, this, null).initiate();
- } catch (InterruptedException e) {
- readComplete(BKException.Code.InterruptedException, lh, null, null);
+ if (!callbackDone.get()) {
+ entryToRead++;
+ try {
+ new RecoveryReadOp(lh, lh.bk.scheduler, entryToRead, entryToRead, this, null).initiate();
+ } catch (InterruptedException e) {
+ readComplete(BKException.Code.InterruptedException, lh, null, null);
+ }
+ }
+ }
+
+ private void closeAndCallback() {
+ if (callbackDone.compareAndSet(false, true)) {
+ lh.asyncCloseInternal(new CloseCallback() {
+ @Override
+ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.warn("Close ledger {} failed during recovery: ",
+ LedgerRecoveryOp.this.lh.getId(), BKException.getMessage(rc));
+ cb.operationComplete(rc, null);
+ } else {
+ cb.operationComplete(BKException.Code.OK, null);
+ LOG.debug("After closing length is: {}", lh.getLength());
+ }
+ }
+ }, null, BKException.Code.LedgerClosedException);
}
}
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
if (rc == BKException.Code.OK) {
+ readCount.incrementAndGet();
LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
@@ -125,27 +151,20 @@ class LedgerRecoveryOp implements ReadCa
lh.length = entry.getLength() - (long) data.length;
}
lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
+ doRecoveryRead();
return;
}
if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
- lh.asyncCloseInternal(new CloseCallback() {
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
- if (rc != BKException.Code.OK) {
- LOG.warn("Close failed: " + BKException.getMessage(rc));
- cb.operationComplete(rc, null);
- } else {
- cb.operationComplete(BKException.Code.OK, null);
- LOG.debug("After closing length is: {}", lh.getLength());
- }
- }
- }, null, BKException.Code.LedgerClosedException);
+ readDone.set(true);
+ if (readCount.get() == writeCount.get()) {
+ closeAndCallback();
+ }
return;
}
// otherwise, some other error, we can't handle
- LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + (lh.lastAddConfirmed + 1)
+ LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + entryToRead
+ " ledger: " + lh.ledgerId + " while recovering ledger");
cb.operationComplete(rc, null);
return;
@@ -154,14 +173,18 @@ class LedgerRecoveryOp implements ReadCa
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (rc != BKException.Code.OK) {
- // Give up, we can't recover from this error
-
LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
+ " ledger: " + lh.ledgerId + " while recovering ledger");
- cb.operationComplete(rc, null);
+ if (callbackDone.compareAndSet(false, true)) {
+ // Give up, we can't recover from this error
+ cb.operationComplete(rc, null);
+ }
return;
}
- doRecoveryRead();
+ long numAdd = writeCount.incrementAndGet();
+ if (readDone.get() && readCount.get() == numAdd) {
+ closeAndCallback();
+ }
}
}