You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/03/31 10:58:52 UTC
svn commit: r1307743 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: sijie
Date: Sat Mar 31 08:58:51 2012
New Revision: 1307743
URL: http://svn.apache.org/viewvc?rev=1307743&view=rev
Log:
BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHandle#close on that ledger to fail (sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Mar 31 08:58:51 2012
@@ -74,6 +74,8 @@ Trunk (unreleased changes)
BOOKKEEPER-198: replaying entries of deleted ledgers would exhaust ledger cache. (sijie)
+ BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHandle#close on that ledger to fail (sijie)
+
hedwig-server/
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Sat Mar 31 08:58:51 2012
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.Asyn
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -83,7 +84,6 @@ public class BookKeeperAdmin {
private DigestType DIGEST_TYPE;
private byte[] PASSWD;
-
/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -426,8 +426,8 @@ public class BookKeeperAdmin {
* ledger fragments are stored on. Check if any of the ledger fragments
* for the current ledger are stored on the input dead bookie.
*/
- DigestType digestType = getLedgerDigestType(lId);
- byte[] passwd = getLedgerPasswd(lId);
+ final DigestType digestType = getLedgerDigestType(lId);
+ final byte[] passwd = getLedgerPasswd(lId);
bkc.asyncOpenLedgerNoRecovery(lId, digestType, passwd, new OpenCallback() {
@Override
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
@@ -436,6 +436,39 @@ public class BookKeeperAdmin {
ledgerIterCb.processResult(rc, null, null);
return;
}
+
+ LedgerMetadata lm = lh.getLedgerMetadata();
+ if (!lm.isClosed() &&
+ lm.getEnsembles().size() > 0) {
+ Long lastKey = lm.getEnsembles().lastKey();
+ ArrayList<InetSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+ // the original write has not removed faulty bookie from
+ // current ledger ensemble. to avoid data loss issue in
+ // the case of concurrent updates to the ensemble composition,
+ // the recovery tool should first close the ledger
+ if (lastEnsemble.contains(bookieSrc)) {
+ // close opened non recovery ledger handle
+ try {
+ lh.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ }
+ bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() {
+ @Override
+ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+ if (newrc != Code.OK.intValue()) {
+ LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+ ledgerIterCb.processResult(newrc, null, null);
+ return;
+ }
+ // do recovery
+ recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
+ }
+ }, null);
+ return;
+ }
+ }
+
/*
* This List stores the ledger fragments to recover indexed by
* the start entry ID for the range. The ensembles TreeMap is
@@ -465,6 +498,12 @@ public class BookKeeperAdmin {
ledgerFragmentsToRecover.add(entry.getKey());
}
}
+ // add last ensemble otherwise if the failed bookie existed in
+ // the last ensemble of a closed ledger. the entries belonged to
+ // last ensemble would not be replicated.
+ if (curEntryId != null) {
+ ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
+ }
/*
* See if this current ledger contains any ledger fragment that
* needs to be re-replicated. If not, then just invoke the
@@ -503,7 +542,6 @@ public class BookKeeperAdmin {
+ "," + endEntryId + "] of ledger " + lh.getId()
+ " to " + newBookie);
}
-
try {
SingleFragmentCallback cb = new SingleFragmentCallback(
ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
@@ -551,6 +589,12 @@ public class BookKeeperAdmin {
cb.processResult(BKException.Code.OK, null, null);
return;
}
+ if (startEntryId > endEntryId) {
+ // for open ledger which there is no entry, the start entry id is 0, the end entry id is -1.
+ // we can return immediately to trigger forward read
+ cb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
ArrayList<InetSocketAddress> curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
int bookieIndex = 0;
@@ -603,7 +647,7 @@ public class BookKeeperAdmin {
* entries that were stored on the failed bookie.
*/
private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh,
- final MultiCallback ledgerFragmentEntryMcb,
+ final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
final InetSocketAddress newBookie) throws InterruptedException {
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
@@ -659,13 +703,14 @@ public class BookKeeperAdmin {
* be a multicallback responsible for all fragments in a single ledger
*/
class SingleFragmentCallback implements AsyncCallback.VoidCallback {
- final MultiCallback ledgerFragmentsMcb;
+ final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
final long fragmentStartId;
final InetSocketAddress oldBookie;
final InetSocketAddress newBookie;
- SingleFragmentCallback(MultiCallback ledgerFragmentsMcb, LedgerHandle lh,
+ SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
+ LedgerHandle lh,
long fragmentStartId,
InetSocketAddress oldBookie,
InetSocketAddress newBookie) {
@@ -684,6 +729,10 @@ public class BookKeeperAdmin {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
+ writeLedgerConfig();
+ }
+
+ protected void writeLedgerConfig() {
/*
* Update the ledger metadata's ensemble info to point
* to the new bookie.
@@ -693,20 +742,32 @@ public class BookKeeperAdmin {
int deadBookieIndex = ensemble.indexOf(oldBookie);
ensemble.remove(deadBookieIndex);
ensemble.add(deadBookieIndex, newBookie);
-
-
+
lh.writeLedgerConfig(new WriteCb(), null);
}
private class WriteCb implements AsyncCallback.StatCallback {
@Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
+ public void processResult(int rc, final String path, Object ctx, Stat stat) {
if (rc == Code.BADVERSION.intValue()) {
LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId()
+ " startid: " + fragmentStartId);
// try again, the previous success (with which this has conflicted)
// will have updated the stat
- lh.writeLedgerConfig(new WriteCb(), null);
+ // other operations such as (addEnsemble) would update it too.
+ lh.rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ @Override
+ public void operationComplete(int rc, LedgerMetadata newMeta) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("Error reading updated ledger metadata for ledger " + lh.getId(),
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ ledgerFragmentsMcb.processResult(rc, null, null);
+ } else {
+ lh.metadata = newMeta;
+ writeLedgerConfig();
+ }
+ }
+ });
return;
} else if (rc != Code.OK.intValue()) {
LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
@@ -725,4 +786,5 @@ public class BookKeeperAdmin {
}
};
}
+
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Sat Mar 31 08:58:51 2012
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.client;
* under the License.
*
*/
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
@@ -253,7 +254,13 @@ public class LedgerHandle {
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
+ final long prevClose;
+ final long prevLength;
+
synchronized(LedgerHandle.this) {
+ prevClose = metadata.close;
+ prevLength = metadata.length;
+
// synchronized on LedgerHandle.this to ensure that
// lastAddPushed can not be updated after the metadata
// is closed.
@@ -261,7 +268,6 @@ public class LedgerHandle {
// Close operation is idempotent, so no need to check if we are
// already closed
-
metadata.close(lastAddConfirmed);
errorOutPendingAdds(rc);
lastAddPushed = lastAddConfirmed;
@@ -272,11 +278,37 @@ public class LedgerHandle {
+ metadata.close + " with this many bytes: " + metadata.length);
}
- writeLedgerConfig(new StatCallback() {
+ final class CloseCb implements StatCallback {
@Override
- public void processResult(int rc, String path, Object subctx,
- Stat stat) {
- if (rc != KeeperException.Code.OK.intValue()) {
+ public void processResult(final int rc, String path, Object subctx,
+ final Stat stat) {
+ if (rc == KeeperException.Code.BadVersion) {
+ rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ @Override
+ public void operationComplete(int newrc, LedgerMetadata newMeta) {
+ if (newrc != BKException.Code.OK) {
+ LOG.error("Error reading new metadata from ledger " + ledgerId
+ + " when closing, code=" + newrc);
+ cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+ } else {
+ metadata.close(prevClose);
+ metadata.length = prevLength;
+ if (metadata.resolveConflict(newMeta)) {
+ metadata.length = length;
+ metadata.close(lastAddConfirmed);
+ writeLedgerConfig(new CloseCb(), null);
+ return;
+ } else {
+ metadata.length = length;
+ metadata.close(lastAddConfirmed);
+ LOG.warn("Conditional write failed: "
+ + KeeperException.Code.get(KeeperException.Code.BadVersion));
+ cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+ }
+ }
+ }
+ });
+ } else if (rc != KeeperException.Code.OK.intValue()) {
LOG.warn("Conditional write failed: " + KeeperException.Code.get(rc));
cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
ctx);
@@ -285,7 +317,9 @@ public class LedgerHandle {
cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
}
}
- }, null);
+ };
+
+ writeLedgerConfig(new CloseCb(), null);
}
});
@@ -639,16 +673,43 @@ public class LedgerHandle {
+ (lastAddConfirmed + 1));
}
- metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
+ final long newEnsembleStartEntry = lastAddConfirmed + 1;
+ metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
- writeLedgerConfig(new StatCallback() {
+ final class ChangeEnsembleCb implements StatCallback {
@Override
public void processResult(final int rc, String path, Object ctx, final Stat stat) {
bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
- if (rc != KeeperException.Code.OK.intValue()) {
+ if (rc == KeeperException.Code.BadVersion) {
+ rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ @Override
+ public void operationComplete(int newrc, LedgerMetadata newMeta) {
+ if (newrc != BKException.Code.OK) {
+ LOG.error("Error reading new metadata from ledger after changing ensemble, code=" + newrc);
+ handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ } else {
+ // a new ensemble is added only when the start entry is larger than zero
+ if (newEnsembleStartEntry > 0) {
+ metadata.getEnsembles().remove(newEnsembleStartEntry);
+ }
+ if (metadata.resolveConflict(newMeta)) {
+ metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
+ writeLedgerConfig(new ChangeEnsembleCb(), null);
+ return;
+ } else {
+ LOG.error("Could not resolve ledger metadata confliction while changing ensemble to: "
+ + newEnsemble + ", old meta data is \n" + new String(metadata.serialize())
+ + "\n, new meta data is \n" + new String(newMeta.serialize()) + "\n ,closing ledger");
+ handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+ }
+ }
+ }
+ });
+ return;
+ } else if (rc != KeeperException.Code.OK.intValue()) {
LOG
.error("Could not persist ledger metadata while changing ensemble to: "
+ newEnsemble + " , closing ledger");
@@ -664,30 +725,33 @@ public class LedgerHandle {
});
}
- }, null);
+ };
+
+ writeLedgerConfig(new ChangeEnsembleCb(), null);
}
- void rereadMetadata(final GenericCallback<Void> cb) {
+ void rereadMetadata(final GenericCallback<LedgerMetadata> cb) {
bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
- new DataCallback() {
- public void processResult(int rc, String path,
- Object ctx, byte[] data, Stat stat) {
- if (rc != KeeperException.Code.OK.intValue()) {
- LOG.error("Error reading metadata from ledger, code =" + rc);
- cb.operationComplete(BKException.Code.ZKException, null);
- return;
- }
-
- try {
- metadata = LedgerMetadata.parseConfig(data, stat.getVersion());
- } catch (IOException e) {
- LOG.error("Error parsing ledger metadata for ledger", e);
- cb.operationComplete(BKException.Code.ZKException, null);
- }
- cb.operationComplete(BKException.Code.OK, null);
+ new DataCallback() {
+ public void processResult(int rc, String path,
+ Object ctx, byte[] data, Stat stat) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ LOG.error("Error reading metadata from ledger, code =" + rc);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ return;
}
- }, null);
+
+ try {
+ LedgerMetadata newMeta = LedgerMetadata.parseConfig(data, stat.getVersion());
+ cb.operationComplete(BKException.Code.OK, newMeta);
+ } catch (IOException e) {
+ LOG.error("Error parsing ledger metadata for ledger", e);
+ cb.operationComplete(BKException.Code.ZKException, null);
+ return;
+ }
+ }
+ }, null);
}
void recover(final GenericCallback<Void> cb) {
@@ -713,12 +777,13 @@ public class LedgerHandle {
@Override
public void processResult(final int rc, String path, Object ctx, Stat stat) {
if (rc == KeeperException.Code.BadVersion) {
- rereadMetadata(new GenericCallback<Void>() {
+ rereadMetadata(new GenericCallback<LedgerMetadata>() {
@Override
- public void operationComplete(int rc, Void result) {
+ public void operationComplete(int rc, LedgerMetadata newMeta) {
if (rc != BKException.Code.OK) {
cb.operationComplete(rc, null);
} else {
+ metadata = newMeta;
recover(cb);
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Sat Mar 31 08:58:51 2012
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -259,4 +260,47 @@ public class LedgerMetadata {
public int getZnodeVersion() {
return this.znodeVersion;
}
+
+ /**
+ * Resolve conflict with new updated metadata.
+ *
+ * @param newMeta
+ * Re-read metadata
+ * @return true if the confliction is resolved, otherwise false.
+ */
+ boolean resolveConflict(LedgerMetadata newMeta) {
+ // length & close is changed means other one open the ledger
+ // can't resolve this confliction
+ if (metadataFormatVersion != newMeta.metadataFormatVersion ||
+ ensembleSize != newMeta.ensembleSize ||
+ quorumSize != newMeta.quorumSize ||
+ length != newMeta.length ||
+ close != newMeta.close) {
+ return false;
+ }
+ // new meta znode version should be larger than old one
+ if (znodeVersion > newMeta.znodeVersion) {
+ return false;
+ }
+ // ensemble size should be same
+ if (ensembles.size() != newMeta.ensembles.size()) {
+ return false;
+ }
+ // ensemble distribution should be same
+ // we don't check the detail ensemble, since new bookie will be set
+ // using recovery tool.
+ Iterator<Long> keyIter = ensembles.keySet().iterator();
+ Iterator<Long> newMetaKeyIter = newMeta.ensembles.keySet().iterator();
+ for (int i=0; i<ensembles.size(); i++) {
+ Long curKey = keyIter.next();
+ Long newMetaKey = newMetaKeyIter.next();
+ if (curKey != newMetaKey) {
+ return false;
+ }
+ }
+ // if the confliction could be resolved, update ensembles and znode version
+ ensembles = newMeta.ensembles;
+ znodeVersion = newMeta.znodeVersion;
+ return true;
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Sat Mar 31 08:58:51 2012
@@ -126,7 +126,7 @@ class LedgerRecoveryOp implements ReadCa
}
// otherwise, some other error, we can't handle
- LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + lh.lastAddConfirmed + 1
+ LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + (lh.lastAddConfirmed + 1)
+ " ledger: " + lh.ledgerId + " while recovering ledger");
cb.operationComplete(rc, null);
return;
@@ -137,7 +137,7 @@ class LedgerRecoveryOp implements ReadCa
if (rc != BKException.Code.OK) {
// Give up, we can't recover from this error
- LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + lh.lastAddConfirmed + 1
+ LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
+ " ledger: " + lh.ledgerId + " while recovering ledger");
cb.operationComplete(rc, null);
return;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Sat Mar 31 08:58:51 2012
@@ -121,6 +121,21 @@ class PendingReadOp implements Enumerati
public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
final LedgerEntry entry = (LedgerEntry) ctx;
+ // if we just read only one entry, and this entry is not existed (in recoveryRead case)
+ // we don't need to do ReattemptRead, otherwise we could not handle following case:
+ //
+ // an empty ledger with quorum (bk1, bk2), bk2 is failed forever.
+ // bk1 return NoLedgerException, client do ReattemptRead to bk2 but bk2 isn't connected
+ // so the read 0 entry would failed. this ledger could never be closed.
+ if (startEntryId == endEntryId) {
+ if (BKException.Code.NoSuchLedgerExistsException == rc ||
+ BKException.Code.NoSuchEntryException == rc) {
+ lh.opCounterSem.release();
+ submitCallback(rc);
+ return;
+ }
+ }
+
if (rc != BKException.Code.OK) {
logErrorAndReattemptRead(entry, "Error: " + BKException.getMessage(rc), rc);
return;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Sat Mar 31 08:58:51 2012
@@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.HashMap;
import java.util.Collections;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.buffer.ChannelBuffer;
import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.BaseTestCase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -159,6 +161,15 @@ public class BookieRecoveryTest extends
return lhs;
}
+ private List<LedgerHandle> openLedgers(List<LedgerHandle> oldLhs)
+ throws Exception {
+ List<LedgerHandle> newLhs = new ArrayList<LedgerHandle>();
+ for (LedgerHandle oldLh : oldLhs) {
+ newLhs.add(bkc.openLedger(oldLh.getId(), digestType, baseClientConf.getBookieRecoveryPasswd()));
+ }
+ return newLhs;
+ }
+
/**
* Helper method to write dummy ledger entries to all of the ledgers passed.
*
@@ -171,8 +182,9 @@ public class BookieRecoveryTest extends
* @throws BKException
* @throws InterruptedException
*/
- private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
- InterruptedException {
+ private void writeEntriestoLedgers(int numEntries, long startEntryId,
+ List<LedgerHandle> lhs)
+ throws BKException, InterruptedException {
for (LedgerHandle lh : lhs) {
for (int i = 0; i < numEntries; i++) {
lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
@@ -180,6 +192,12 @@ public class BookieRecoveryTest extends
}
}
+ private void closeLedgers(List<LedgerHandle> lhs) throws BKException, InterruptedException {
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+ }
+
/**
* Helper method to verify that we can read the recovered ledger entries.
*
@@ -507,6 +525,140 @@ public class BookieRecoveryTest extends
return numDupes > 0;
}
+ /**
+ * Test recoverying the closed ledgers when the failed bookie server is in the last ensemble
+ */
+ @Test
+ public void testBookieRecoveryOnClosedLedgers() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+ // Write the entries for the ledgers with dummy values
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ closeLedgers(lhs);
+
+ // Shutdown last bookie server in last ensemble
+ ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+ .entrySet().iterator().next().getValue();
+ InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+ killBookie(bookieToKill);
+
+ // start a new bookie
+ startNewBookie();
+
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieToKill
+ + ") and replicate it to a random available one");
+
+ bkAdmin.recoverBookieData(bookieToKill, bookieDest);
+ for (LedgerHandle lh : lhs) {
+ assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+ lh.close();
+ }
+ }
+
+ @Test
+ public void testBookieRecoveryOnOpenedLedgers() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+ // Write the entries for the ledgers with dummy values
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+ .entrySet().iterator().next().getValue();
+ InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+ killBookie(bookieToKill);
+
+ // start a new bookie
+ startNewBookie();
+
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieToKill
+ + ") and replicate it to a random available one");
+
+ bkAdmin.recoverBookieData(bookieToKill, bookieDest);
+
+ for (LedgerHandle lh : lhs) {
+ assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+ }
+
+ try {
+ // we can't write entries
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+ fail("should not reach here");
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testBookieRecoveryOnInRecoveryLedger() throws Exception {
+ int numMsgs = 10;
+ // Create the ledgers
+ int numLedgers = 1;
+ List<LedgerHandle> lhs = createLedgers(numLedgers, 2, 2);
+
+ // Write the entries for the ledgers with dummy values
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+ .entrySet().iterator().next().getValue();
+ // removed bookie
+ InetSocketAddress bookieToKill = lastEnsemble.get(0);
+ killBookie(bookieToKill);
+ // temp failure
+ InetSocketAddress bookieToKill2 = lastEnsemble.get(1);
+ ServerConfiguration conf2 = killBookie(bookieToKill2);
+
+ // start a new bookie
+ startNewBookie();
+
+ // open these ledgers
+ for (LedgerHandle oldLh : lhs) {
+ try {
+ bkc.openLedger(oldLh.getId(), digestType, baseClientConf.getBookieRecoveryPasswd());
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ }
+ }
+
+ try {
+ bkAdmin.recoverBookieData(bookieToKill, null);
+ fail("Should have thrown exception");
+ } catch (BKException.BKLedgerRecoveryException bke) {
+ // correct behaviour
+ }
+
+ // restart failed bookie
+ bs.add(startBookie(conf2));
+ bsConfs.add(conf2);
+
+ // recover them
+ bkAdmin.recoverBookieData(bookieToKill, null);
+
+ for (LedgerHandle lh : lhs) {
+ assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+ }
+
+ // open ledgers to read metadata
+ List<LedgerHandle> newLhs = openLedgers(lhs);
+ for (LedgerHandle newLh : newLhs) {
+ // first ensemble should contains bookieToKill2 and not contain bookieToKill
+ Map.Entry<Long, ArrayList<InetSocketAddress>> entry =
+ newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
+ assertFalse(entry.getValue().contains(bookieToKill));
+ assertTrue(entry.getValue().contains(bookieToKill2));
+ }
+
+ }
+
@Test
public void testAsyncBookieRecoveryToRandomBookiesNotEnoughBookies() throws Exception {
// Create the ledgers
@@ -575,9 +727,7 @@ public class BookieRecoveryTest extends
writeEntriestoLedgers(numMsgs, numMsgs*2, lhs);
for (LedgerHandle lh : lhs) {
assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs*3));
- // TODO (BOOKKEEPER-112) this throws an exception at the moment
- // because recovering a ledger updates the ledger znode
- //lh.close();
+ lh.close();
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java Sat Mar 31 08:58:51 2012
@@ -269,16 +269,18 @@ public class TestFencing extends BaseTes
InetSocketAddress bookieToKill
= writelh.getLedgerMetadata().getEnsemble(numEntries).get(0);
killBookie(bookieToKill);
+
+ // write entries to change ensemble
+ for (int i = 0; i < numEntries; i++) {
+ writelh.addEntry(tmp.getBytes());
+ }
+
admin.recoverBookieData(bookieToKill, null);
- /* TODO: uncomment this when BOOKKEEPER-112 is
- fixed
-
for (int i = 0; i < numEntries; i++) {
writelh.addEntry(tmp.getBytes());
}
- */
-
+
LedgerHandle readlh = bkc.openLedger(writelh.getId(),
digestType, "testPasswd".getBytes());
try {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Sat Mar 31 08:58:51 2012
@@ -169,9 +169,10 @@ public abstract class BookKeeperClusterT
*
* @param addr
* Socket Address
+ * @return the configuration of killed bookie
* @throws InterruptedException
*/
- public void killBookie(InetSocketAddress addr) throws InterruptedException {
+ public ServerConfiguration killBookie(InetSocketAddress addr) throws InterruptedException {
BookieServer toRemove = null;
int toRemoveIndex = 0;
for (BookieServer server : bs) {
@@ -184,8 +185,9 @@ public abstract class BookKeeperClusterT
}
if (toRemove != null) {
bs.remove(toRemove);
- bsConfs.remove(toRemoveIndex);
+ return bsConfs.remove(toRemoveIndex);
}
+ return null;
}
/**
@@ -193,17 +195,18 @@ public abstract class BookKeeperClusterT
*
* @param index
* Bookie Index
+ * @return the configuration of killed bookie
* @throws InterruptedException
* @throws IOException
*/
- public void killBookie(int index) throws InterruptedException, IOException {
+ public ServerConfiguration killBookie(int index) throws InterruptedException, IOException {
if (index >= bs.size()) {
throw new IOException("Bookie does not exist");
}
BookieServer server = bs.get(index);
server.shutdown();
bs.remove(server);
- bsConfs.remove(index);
+ return bsConfs.remove(index);
}
/**
@@ -318,7 +321,7 @@ public abstract class BookKeeperClusterT
* Server Configuration Object
*
*/
- private BookieServer startBookie(ServerConfiguration conf)
+ protected BookieServer startBookie(ServerConfiguration conf)
throws IOException, InterruptedException, KeeperException, BookieException {
BookieServer server = new BookieServer(conf);
server.start();
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java Sat Mar 31 08:58:51 2012
@@ -72,4 +72,29 @@ public class CloseTest extends BaseTestC
lh[i].close();
}
}
+
+ @Test
+ public void testCloseByOthers() throws Exception {
+
+ int numLedgers = 1;
+ int numMsgs = 10;
+
+ LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+ String tmp = "BookKeeper is cool!";
+
+ /*
+ * Write 10 entries to lh.
+ */
+ for (int i = 0; i < numMsgs; i++) {
+ lh.addEntry(tmp.getBytes());
+ }
+
+ // other one close the entries
+ LedgerHandle lh2 = bkc.openLedger(lh.getId(), digestType, "".getBytes());
+
+ // so the ledger would be closed, the metadata is changed
+ // the original ledger handle should be able to close it successfully
+ lh2.close();
+ }
}