You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/07 12:44:24 UTC

svn commit: r1490578 - in /zookeeper/bookkeeper/branches/branch-4.2: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/

Author: ivank
Date: Fri Jun  7 10:44:24 2013
New Revision: 1490578

URL: http://svn.apache.org/r1490578
Log:
BOOKKEEPER-581: Ledger recovery doesn't work correctly when recovery adds force changing ensembles. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java

Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Fri Jun  7 10:44:24 2013
@@ -14,6 +14,8 @@ Release 4.2.2 - Unreleased
 
         BOOKKEEPER-585: Auditor logs noisily when a ledger has been deleted (ivank)
 
+        BOOKKEEPER-581: Ledger recovery doesn't work correctly when recovery adds force changing ensembles. (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-579: TestSubAfterCloseSub was put in a wrong package (sijie via ivank)

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jun  7 10:44:24 2013
@@ -18,6 +18,8 @@ package org.apache.bookkeeper.client;
  * limitations under the License.
  */
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.BufferedReader;
 import java.io.StringReader;
 import java.io.IOException;
@@ -25,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Arrays;
@@ -96,6 +99,30 @@ public class LedgerMetadata {
         this.hasPassword = true;
     }
 
+    /**
+     * Copy Constructor.
+     */
+    LedgerMetadata(LedgerMetadata other) {
+        this.ensembleSize = other.ensembleSize;
+        this.writeQuorumSize = other.writeQuorumSize;
+        this.ackQuorumSize = other.ackQuorumSize;
+        this.length = other.length;
+        this.lastEntryId = other.lastEntryId;
+        this.metadataFormatVersion = other.metadataFormatVersion;
+        this.state = other.state;
+        this.version = other.version;
+        this.hasPassword = other.hasPassword;
+        this.digestType = other.digestType;
+        this.password = new byte[other.password.length];
+        System.arraycopy(other.password, 0, this.password, 0, other.password.length);
+        // copy the ensembles
+        for (Entry<Long, ArrayList<InetSocketAddress>> entry : other.ensembles.entrySet()) {
+            long startEntryId = entry.getKey();
+            ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(entry.getValue());
+            this.addEnsemble(startEntryId, newEnsemble);
+        }
+    }
+
     private LedgerMetadata() {
         this(0, 0, 0, BookKeeper.DigestType.MAC, new byte[] {});
         this.hasPassword = false;

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Fri Jun  7 10:44:24 2013
@@ -19,21 +19,16 @@ package org.apache.bookkeeper.client;
  */
 
 import java.util.Enumeration;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
-import org.apache.bookkeeper.client.LedgerHandle.NoopCloseCallback;
 import org.apache.bookkeeper.client.DigestManager.RecoveryData;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
  * This class encapsulated the ledger recovery operation. It first does a read
@@ -51,9 +46,25 @@ class LedgerRecoveryOp implements ReadCa
     long maxAddPushed = LedgerHandle.INVALID_ENTRY_ID;
     long maxAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
     long maxLength = 0;
+    // keep a copy of metadata for recovery.
+    LedgerMetadata metadataForRecovery;
 
     GenericCallback<Void> cb;
 
+    class RecoveryReadOp extends PendingReadOp {
+
+        RecoveryReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, long startEntryId,
+                long endEntryId, ReadCallback cb, Object ctx) {
+            super(lh, scheduler, startEntryId, endEntryId, cb, ctx);
+        }
+
+        @Override
+        protected LedgerMetadata getLedgerMetadata() {
+            return metadataForRecovery;
+        }
+
+    }
+
     public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
         this.cb = cb;
         this.lh = lh;
@@ -63,17 +74,20 @@ class LedgerRecoveryOp implements ReadCa
     public void initiate() {
         ReadLastConfirmedOp rlcop = new ReadLastConfirmedOp(lh,
                 new ReadLastConfirmedOp.LastConfirmedDataCallback() {
-                public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
-                    if (rc == BKException.Code.OK) {
-                        lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
-                        lh.length = data.length;
-                        doRecoveryRead();
-                    } else if (rc == BKException.Code.UnauthorizedAccessException) {
-                        cb.operationComplete(rc, null);
-                    } else {
-                        cb.operationComplete(BKException.Code.ReadException, null);
+                    public void readLastConfirmedDataComplete(int rc, RecoveryData data) {
+                        if (rc == BKException.Code.OK) {
+                            lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
+                            lh.length = data.length;
+                            // keep a copy of ledger metadata before proceeding
+                            // ledger recovery
+                            metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
+                            doRecoveryRead();
+                        } else if (rc == BKException.Code.UnauthorizedAccessException) {
+                            cb.operationComplete(rc, null);
+                        } else {
+                            cb.operationComplete(BKException.Code.ReadException, null);
+                        }
                     }
-                }
                 });
 
         /**
@@ -88,14 +102,16 @@ class LedgerRecoveryOp implements ReadCa
      * Try to read past the last confirmed.
      */
     private void doRecoveryRead() {
-        lh.lastAddConfirmed++;
-        lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+        long nextEntry = lh.lastAddConfirmed + 1;
+        try {
+            new RecoveryReadOp(lh, lh.bk.scheduler, nextEntry, nextEntry, this, null).initiate();
+        } catch (InterruptedException e) {
+            readComplete(BKException.Code.InterruptedException, lh, null, null);
+        }
     }
 
     @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
-        // get back to prev value
-        lh.lastAddConfirmed--;
         if (rc == BKException.Code.OK) {
             LedgerEntry entry = seq.nextElement();
             byte[] data = entry.getEntry();
@@ -145,9 +161,7 @@ class LedgerRecoveryOp implements ReadCa
             cb.operationComplete(rc, null);
             return;
         }
-
         doRecoveryRead();
-
     }
 
 }

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Jun  7 10:44:24 2013
@@ -21,27 +21,27 @@ package org.apache.bookkeeper.client;
  *
  */
 import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ScheduledFuture;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Queue;
-import java.util.BitSet;
 import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Sequence of entries of a ledger that represents a pending read operation.
@@ -128,7 +128,7 @@ class PendingReadOp implements Enumerati
          * @return host we sent to if we sent. null otherwise.
          */
         synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
-            if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
+            if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 return null;
             }
 
@@ -146,7 +146,7 @@ class PendingReadOp implements Enumerati
         }
 
         synchronized InetSocketAddress sendNextRead() {
-            if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
+            if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just fail the
                 // read
 
@@ -251,11 +251,16 @@ class PendingReadOp implements Enumerati
         this.endEntryId = endEntryId;
         this.scheduler = scheduler;
         numPendingEntries = endEntryId - startEntryId + 1;
-        maxMissedReadsAllowed = lh.metadata.getWriteQuorumSize() - lh.metadata.getAckQuorumSize();
+        maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
+                - getLedgerMetadata().getAckQuorumSize();
         speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
         heardFromHosts = new HashSet<InetSocketAddress>();
     }
 
+    protected LedgerMetadata getLedgerMetadata() {
+        return lh.metadata;
+    }
+
     public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
 
@@ -283,11 +288,9 @@ class PendingReadOp implements Enumerati
         }
 
         do {
-            LOG.debug("Acquiring lock: {}", i);
-
             if (i == nextEnsembleChange) {
-                ensemble = lh.metadata.getEnsemble(i);
-                nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
+                ensemble = getLedgerMetadata().getEnsemble(i);
+                nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i);
             }
             LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, lh.ledgerId, i);
             seq.add(entry);

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1490578&r1=1490577&r2=1490578&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Fri Jun  7 10:44:24 2013
@@ -24,20 +24,18 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.junit.*;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BaseTestCase;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -356,4 +354,67 @@ public class LedgerRecoveryTest extends 
         assertEquals("Fenced ledger should have correct lastAddConfirmed",
                      lhbefore.getLastAddConfirmed(), lhafter.getLastAddConfirmed());
     }
+
+    /**
+     * Verify that it doesn't break the recovery when changing ensemble in
+     * recovery add.
+     */
+    @Test(timeout = 60000)
+    public void testEnsembleChangeDuringRecovery() throws Exception {
+        LedgerHandle lh = bkc.createLedger(numBookies, 2, 2, digestType, "".getBytes());
+        int numEntries = (numBookies * 3) + 1;
+        final AtomicInteger numPendingAdds = new AtomicInteger(numEntries);
+        final CountDownLatch addDone = new CountDownLatch(1);
+        for (int i = 0; i < numEntries; i++) {
+            lh.asyncAddEntry("data".getBytes(), new AddCallback() {
+
+                @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    if (BKException.Code.OK != rc) {
+                        addDone.countDown();
+                        return;
+                    }
+                    if (numPendingAdds.decrementAndGet() == 0) {
+                        addDone.countDown();
+                    }
+                }
+
+            }, null);
+        }
+        addDone.await(10, TimeUnit.SECONDS);
+        if (numPendingAdds.get() > 0) {
+            fail("Failed to add " + numEntries + " to ledger handle " + lh.getId());
+        }
+        // kill first 2 bookies to replace bookies
+        InetSocketAddress bookie1 = lh.getLedgerMetadata().currentEnsemble.get(0);
+        ServerConfiguration conf1 = killBookie(bookie1);
+        InetSocketAddress bookie2 = lh.getLedgerMetadata().currentEnsemble.get(1);
+        ServerConfiguration conf2 = killBookie(bookie2);
+
+        // replace these two bookies
+        startDeadBookie(conf1);
+        startDeadBookie(conf2);
+        // kick in two brand new bookies
+        startNewBookie();
+        startNewBookie();
+
+        // two dead bookies are put in the ensemble which would cause ensemble
+        // change
+        LedgerHandle recoveredLh = bkc.openLedger(lh.getId(), digestType, "".getBytes());
+        assertEquals("Fenced ledger should have correct lastAddConfirmed", lh.getLastAddConfirmed(),
+                recoveredLh.getLastAddConfirmed());
+    }
+
+    private void startDeadBookie(ServerConfiguration conf) throws Exception {
+        Bookie rBookie = new Bookie(conf) {
+            @Override
+            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+                    throws IOException, BookieException {
+                // drop request to simulate a dead bookie
+                throw new IOException("Couldn't write entries for some reason");
+            }
+        };
+        bsConfs.add(conf);
+        bs.add(startBookie(conf, rBookie));
+    }
 }