You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/07 12:44:24 UTC
svn commit: r1490578 - in /zookeeper/bookkeeper/branches/branch-4.2: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/test/java/org/apache/bookkeeper/client/
Author: ivank
Date: Fri Jun 7 10:44:24 2013
New Revision: 1490578
URL: http://svn.apache.org/r1490578
Log:
BOOKKEEPER-581: Ledger recovery doesn't work correctly when recovery adds force changing ensembles. (sijie via ivank)
Modified:
zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Fri Jun 7 10:44:24 2013
@@ -14,6 +14,8 @@ Release 4.2.2 - Unreleased
BOOKKEEPER-585: Auditor logs noisily when a ledger has been deleted (ivank)
+ BOOKKEEPER-581: Ledger recovery doesn't work correctly when recovery adds force changing ensembles. (sijie via ivank)
+
hedwig-server:
BOOKKEEPER-579: TestSubAfterCloseSub was put in a wrong package (sijie via ivank)
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jun 7 10:44:24 2013
@@ -18,6 +18,8 @@ package org.apache.bookkeeper.client;
* limitations under the License.
*/
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.BufferedReader;
import java.io.StringReader;
import java.io.IOException;
@@ -25,6 +27,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Arrays;
@@ -96,6 +99,30 @@ public class LedgerMetadata {
this.hasPassword = true;
}
+ /**
+ * Copy Constructor.
+ */
+ LedgerMetadata(LedgerMetadata other) {
+ this.ensembleSize = other.ensembleSize;
+ this.writeQuorumSize = other.writeQuorumSize;
+ this.ackQuorumSize = other.ackQuorumSize;
+ this.length = other.length;
+ this.lastEntryId = other.lastEntryId;
+ this.metadataFormatVersion = other.metadataFormatVersion;
+ this.state = other.state;
+ this.version = other.version;
+ this.hasPassword = other.hasPassword;
+ this.digestType = other.digestType;
+ this.password = new byte[other.password.length];
+ System.arraycopy(other.password, 0, this.password, 0, other.password.length);
+ // copy the ensembles
+ for (Entry<Long, ArrayList<InetSocketAddress>> entry : other.ensembles.entrySet()) {
+ long startEntryId = entry.getKey();
+ ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(entry.getValue());
+ this.addEnsemble(startEntryId, newEnsemble);
+ }
+ }
+
private LedgerMetadata() {
this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {});
this.hasPassword = false;
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Fri Jun 7 10:44:24 2013
@@ -19,21 +19,16 @@ package org.apache.bookkeeper.client;
*/
import java.util.Enumeration;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.client.LedgerHandle.NoopCloseCallback;
import org.apache.bookkeeper.client.DigestManager.RecoveryData;
-import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
/**
* This class encapsulated the ledger recovery operation. It first does a read
@@ -51,9 +46,25 @@ class LedgerRecoveryOp implements ReadCa
long maxAddPushed = LedgerHandle.INVALID_ENTRY_ID;
long maxAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
long maxLength = 0;
+ // keep a copy of metadata for recovery.
+ LedgerMetadata metadataForRecovery;
GenericCallback<Void> cb;
+ class RecoveryReadOp extends PendingReadOp {
+
+ RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, long startEntryId,
+ long endEntryId, ReadCallback cb, Object ctx) {
+ super(lh, scheduler, startEntryId, endEntryId, cb, ctx);
+ }
+
+ @Override
+ protected LedgerMetadata getLedgerMetadata() {
+ return metadataForRecovery;
+ }
+
+ }
+
public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
this.cb = cb;
this.lh = lh;
@@ -63,17 +74,20 @@ class LedgerRecoveryOp implements ReadCa
public void initiate() {
ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
- public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
- if (rc == BKException.Code.OK) {
- lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
- lh.length = data.length;
- doRecoveryRead();
- } else if (rc == BKException.Code.UnauthorizedAccessException) {
- cb.operationComplete(rc, null);
- } else {
- cb.operationComplete(BKException.Code.ReadException, null);
+ public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
+ if (rc == BKException.Code.OK) {
+ lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
+ lh.length = data.length;
+ // keep a copy of ledger metadata before proceeding
+ // ledger recovery
+ metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
+ doRecoveryRead();
+ } else if (rc == BKException.Code.UnauthorizedAccessException) {
+ cb.operationComplete(rc, null);
+ } else {
+ cb.operationComplete(BKException.Code.ReadException, null);
+ }
}
- }
});
/**
@@ -88,14 +102,16 @@ class LedgerRecoveryOp implements ReadCa
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
- lh.lastAddConfirmed++;
- lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+ long nextEntry = lh.lastAddConfirmed + 1;
+ try {
+ new RecoveryReadOp(lh, lh.bk.scheduler, nextEntry, nextEntry, this, null).initiate();
+ } catch (InterruptedException e) {
+ readComplete(BKException.Code.InterruptedException, lh, null, null);
+ }
}
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
- // get back to prev value
- lh.lastAddConfirmed--;
if (rc == BKException.Code.OK) {
LedgerEntry entry = seq.nextElement();
byte[] data = entry.getEntry();
@@ -145,9 +161,7 @@ class LedgerRecoveryOp implements ReadCa
cb.operationComplete(rc, null);
return;
}
-
doRecoveryRead();
-
}
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Jun 7 10:44:24 2013
@@ -21,27 +21,27 @@ package org.apache.bookkeeper.client;
*
*/
import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.BitSet;
import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Sequence of entries of a ledger that represents a pending read operation.
@@ -128,7 +128,7 @@ class PendingReadOp implements Enumerati
* @return host we sent to if we sent. null otherwise.
*/
synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
- if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
+ if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
return null;
}
@@ -146,7 +146,7 @@ class PendingReadOp implements Enumerati
}
synchronized InetSocketAddress sendNextRead() {
- if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
+ if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
@@ -251,11 +251,16 @@ class PendingReadOp implements Enumerati
this.endEntryId = endEntryId;
this.scheduler = scheduler;
numPendingEntries = endEntryId - startEntryId + 1;
- maxMissedReadsAllowed = lh.metadata.getWriteQuorumSize() - lh.metadata.getAckQuorumSize();
+ maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
+ - getLedgerMetadata().getAckQuorumSize();
speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
heardFromHosts = new HashSet<InetSocketAddress>();
}
+ protected LedgerMetadata getLedgerMetadata() {
+ return lh.metadata;
+ }
+
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
@@ -283,11 +288,9 @@ class PendingReadOp implements Enumerati
}
do {
- LOG.debug("Acquiring lock: {}", i);
-
if (i == nextEnsembleChange) {
- ensemble = lh.metadata.getEnsemble(i);
- nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
+ ensemble = getLedgerMetadata().getEnsemble(i);
+ nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i);
}
LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, lh.ledgerId, i);
seq.add(entry);
Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Fri Jun 7 10:44:24 2013
@@ -24,20 +24,18 @@ package org.apache.bookkeeper.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.*;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -356,4 +354,67 @@ public class LedgerRecoveryTest extends
assertEquals("Fenced ledger should have correct lastAddConfirmed",
lhbefore.getLastAddConfirmed(), lhafter.getLastAddConfirmed());
}
+
+ /**
+ * Verify that it doesn't break the recovery when changing ensemble in
+ * recovery add.
+ */
+ @Test(timeout = 60000)
+ public void testEnsembleChangeDuringRecovery() throws Exception {
+ LedgerHandle lh = bkc.createLedger(numBookies, 2, 2, digestType, "".getBytes());
+ int numEntries = (numBookies * 3) + 1;
+ final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
+ final CountDownLatch addDone = new CountDownLatch(1);
+ for (int i = 0; i < numEntries; i++) {
+ lh.asyncAddEntry("data".getBytes(), new AddCallback() {
+
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (BKException.Code.OK != rc) {
+ addDone.countDown();
+ return;
+ }
+ if (numPendingAdds.decrementAndGet() == 0) {
+ addDone.countDown();
+ }
+ }
+
+ }, null);
+ }
+ addDone.await(10, TimeUnit.SECONDS);
+ if (numPendingAdds.get() > 0) {
+ fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
+ }
+ // kill first 2 bookies to replace bookies
+ InetSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+ ServerConfiguration conf1 = killBookie(bookie1);
+ InetSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
+ ServerConfiguration conf2 = killBookie(bookie2);
+
+ // replace these two bookies
+ startDeadBookie(conf1);
+ startDeadBookie(conf2);
+ // kick in two brand new bookies
+ startNewBookie();
+ startNewBookie();
+
+ // two dead bookies are put in the ensemble which would cause ensemble
+ // change
+ LedgerHandle recoveredLh = bkc.openLedger(lh.getId(), digestType, "".getBytes());
+ assertEquals("Fenced ledger should have correct lastAddConfirmed", lh.getLastAddConfirmed(),
+ recoveredLh.getLastAddConfirmed());
+ }
+
+ private void startDeadBookie(ServerConfiguration conf) throws Exception {
+ Bookie rBookie = new Bookie(conf) {
+ @Override
+ public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ throws IOException, BookieException {
+ // drop request to simulate a dead bookie
+ throw new IOException("Couldn't write entries for some reason");
+ }
+ };
+ bsConfs.add(conf);
+ bs.add(startBookie(conf, rBookie));
+ }
}