You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/07 21:42:51 UTC

svn commit: r1490797 - in /zookeeper/bookkeeper/branches/branch-4.2: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/

Author: ivank
Date: Fri Jun  7 19:42:49 2013
New Revision: 1490797

URL: http://svn.apache.org/r1490797
Log:
BOOKKEEPER-584: Data loss when ledger metadata is overwritten (sijie via ivank)

Modified:
    zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
    zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java

Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1490797&r1=1490796&r2=1490797&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Fri Jun  7 19:42:49 2013
@@ -20,6 +20,8 @@ Release 4.2.2 - Unreleased
 
         BOOKKEEPER-596: Ledgers are gc'ed by mistake in MSLedgerManagerFactory. (sijie via ivank)
 
+        BOOKKEEPER-584: Data loss when ledger metadata is overwritten (sijie via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-579: TestSubAfterCloseSub was put in a wrong package (sijie via ivank)

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1490797&r1=1490796&r2=1490797&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jun  7 19:42:49 2013
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +18,7 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
 
 import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
@@ -297,7 +296,12 @@ public class LedgerHandle {
                                         }
 
                                         metadata.setLength(prevLength);
-                                        if (metadata.resolveConflict(newMeta)) {
+                                        if (!metadata.isNewerThan(newMeta)
+                                                && !metadata.isConflictWith(newMeta)) {
+                                            // use the new metadata's ensemble, in case re-replication already
+                                            // replaced some bookies in the ensemble.
+                                            metadata.setEnsembles(newMeta.getEnsembles());
+                                            metadata.setVersion(newMeta.version);
                                             metadata.setLength(length);
                                             metadata.close(lastAddConfirmed);
                                             writeLedgerConfig(new CloseCb());
@@ -485,7 +489,7 @@ public class LedgerHandle {
         final long currentLength;
         synchronized(this) {
             // synchronized on this to ensure that
-            // the ledger isn't closed between checking and 
+            // the ledger isn't closed between checking and
             // updating lastAddPushed
             if (metadata.isClosed()) {
                 LOG.warn("Attempt to add to closed ledger: " + ledgerId);
@@ -532,6 +536,7 @@ public class LedgerHandle {
 
     public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) {
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+                @Override
                 public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                     if (rc == BKException.Code.OK) {
                         lastAddConfirmed = Math.max(lastAddConfirmed, data.lastAddConfirmed);
@@ -781,6 +786,8 @@ public class LedgerHandle {
         }
 
         /**
+         * Specific resolve conflicts happened when multiple bookies failures in same ensemble.
+         * <p>
          * Resolving the version conflicts between local ledgerMetadata and zk
          * ledgerMetadata. This will do the following:
          * <ul>
@@ -790,23 +797,35 @@ public class LedgerHandle {
          * if the zk ledgerMetadata still contains the failed bookie, then
          * update zookeeper with the newBookie otherwise send write request</li>
          * </ul>
+         * </p>
          */
         private boolean resolveConflict(LedgerMetadata newMeta) {
-            // close have changed, another client has opened
-            // the ledger, can't resolve this conflict.
+            // make sure the ledger isn't closed by other ones.
             if (metadata.getState() != newMeta.getState()) {
                 return false;
             }
-            // update znode version
-            metadata.setVersion(newMeta.getVersion());
-            // Resolve the conflicts if zk metadata still contains failed
-            // bookie.
+
+            // If the failed the bookie is still existed in the metadata (in zookeeper), it means that
+            // the ensemble change of the failed bookie is failed due to metadata conflicts. so try to
+            // update the ensemble change metadata again. Otherwise, it means that the ensemble change
+            // is already succeed, unset the success and re-adding entries.
             if (newMeta.currentEnsemble.get(ensembleInfo.bookieIndex).equals(
                     ensembleInfo.addr)) {
-                // Update ledger metadata in zk, if in-memory metadata doesn't
-                // contains the failed bookie.
+                // If the in-memory data doesn't contains the failed bookie, it means the ensemble change
+                // didn't finish, so try to resolve conflicts with the metadata read from zookeeper and
+                // update ensemble changed metadata again.
                 if (!metadata.currentEnsemble.get(ensembleInfo.bookieIndex)
                         .equals(ensembleInfo.addr)) {
+                    // if the local metadata is newer than zookeeper metadata, it means that metadata is updated
+                    // again when it was trying re-reading the metatada, re-kick the reread again
+                    if (metadata.isNewerThan(newMeta)) {
+                        rereadMetadata(this);
+                        return true;
+                    }
+                    // make sure the metadata doesn't changed by other ones.
+                    if (metadata.isConflictWith(newMeta)) {
+                        return false;
+                    }
                     LOG.info("Resolve ledger metadata conflict "
                             + "while changing ensemble to: "
                             + ensembleInfo.newEnsemble
@@ -814,6 +833,8 @@ public class LedgerHandle {
                             + new String(metadata.serialize())
                             + "\n, new meta data is \n"
                             + new String(newMeta.serialize()));
+                    // update znode version
+                    metadata.setVersion(newMeta.getVersion());
                     writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo));
                 }
             } else {
@@ -906,6 +927,7 @@ public class LedgerHandle {
          * @param ctx
          *          control object
          */
+        @Override
         public void readComplete(int rc, LedgerHandle lh,
                                  Enumeration<LedgerEntry> seq, Object ctx) {
             
@@ -934,6 +956,7 @@ public class LedgerHandle {
          * @param ctx
          *          control object
          */
+        @Override
         public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
 
@@ -947,6 +970,7 @@ public class LedgerHandle {
         /**
          * Implementation of  callback interface for synchronous read last confirmed method.
          */
+        @Override
         public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
             LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
             
@@ -966,6 +990,7 @@ public class LedgerHandle {
          * @param lh
          * @param ctx
          */
+        @Override
         public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
             counter.setrc(rc);

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1490797&r1=1490796&r2=1490797&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jun  7 19:42:49 2013
@@ -139,6 +139,10 @@ public class LedgerMetadata {
         return ensembles;
     }
 
+    void setEnsembles(SortedMap<Long, ArrayList<InetSocketAddress>> ensembles) {
+        this.ensembles = ensembles;
+    }
+
     public int getEnsembleSize() {
         return ensembleSize;
     }
@@ -433,13 +437,26 @@ public class LedgerMetadata {
     }
 
     /**
-     * Resolve conflict with new updated metadata.
+     * Is the metadata newer that given <i>newMeta</i>.
+     *
+     * @param newMeta
+     * @return
+     */
+    boolean isNewerThan(LedgerMetadata newMeta) {
+        if (null == version) {
+            return false;
+        }
+        return Version.Occurred.AFTER == version.compare(newMeta.version);
+    }
+
+    /**
+     * Is the metadata conflict with new updated metadata.
      *
      * @param newMeta
      *          Re-read metadata
-     * @return true if the conflict has been resolved, otherwise false.
+     * @return true if the metadata is conflict.
      */
-    boolean resolveConflict(LedgerMetadata newMeta) {
+    boolean isConflictWith(LedgerMetadata newMeta) {
         /*
          *  if length & close have changed, then another client has
          *  opened the ledger, can't resolve this conflict.
@@ -453,22 +470,17 @@ public class LedgerMetadata {
             state != newMeta.state ||
             !digestType.equals(newMeta.digestType) ||
             !Arrays.equals(password, newMeta.password)) {
-            return false;
+            return true;
         }
         if (state == LedgerMetadataFormat.State.CLOSED
             && lastEntryId != newMeta.lastEntryId) {
-            return false;
-        }
-        // new meta znode version should be larger than old one
-        if (null != version &&
-            Version.Occurred.AFTER == version.compare(newMeta.version)) {
-            return false;
+            return true;
         }
         // if ledger is closed, we can just take the new ensembles
         if (newMeta.state != LedgerMetadataFormat.State.CLOSED) {
             // ensemble size should be same
             if (ensembles.size() != newMeta.ensembles.size()) {
-                return false;
+                return true;
             }
             // ensemble distribution should be same
             // we don't check the detail ensemble, since new bookie will be set
@@ -479,16 +491,10 @@ public class LedgerMetadata {
                 Long curKey = keyIter.next();
                 Long newMetaKey = newMetaKeyIter.next();
                 if (!curKey.equals(newMetaKey)) {
-                    return false;
+                    return true;
                 }
             }
         }
-        /*
-         *  if the conflict has been resolved, then update
-         *  ensemble and znode version
-         */
-        ensembles = newMeta.ensembles;
-        version = newMeta.version;
-        return true;
+        return false;
     }
 }

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1490797&r1=1490796&r2=1490797&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Fri Jun  7 19:42:49 2013
@@ -21,11 +21,11 @@ import java.util.HashSet;
 import java.util.Set;
 import java.net.InetSocketAddress;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
  * This represents a pending add operation. When it has got success from all
@@ -132,6 +132,11 @@ class PendingAddOp implements WriteCallb
     public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
 
+        if (completed) {
+            // I am already finished, ignore incoming responses.
+            // otherwise, we might hit the following error handling logic, which might cause bad things.
+            return;
+        }
 
         switch (rc) {
         case BKException.Code.OK:

Modified: zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java?rev=1490797&r1=1490796&r2=1490797&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java Fri Jun  7 19:42:49 2013
@@ -58,6 +58,7 @@ public class BookieWriteLedgerTest exten
 
     private static class SyncObj {
         volatile int counter;
+        volatile int rc;
 
         public SyncObj() {
             counter = 0;
@@ -182,6 +183,7 @@ public class BookieWriteLedgerTest exten
                 LOG.debug("Entries counter = " + syncObj1.counter);
                 syncObj1.wait();
             }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
         }
         // wait for all entries to be acknowledged for the second ledger
         synchronized (syncObj2) {
@@ -189,6 +191,7 @@ public class BookieWriteLedgerTest exten
                 LOG.debug("Entries counter = " + syncObj2.counter);
                 syncObj2.wait();
             }
+            assertEquals(BKException.Code.OK, syncObj2.rc);
         }
 
         // reading ledger till the last entry
@@ -217,12 +220,9 @@ public class BookieWriteLedgerTest exten
 
     @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-        if (rc != BKException.Code.OK)
-            fail("Return code is not OK: " + rc);
-
         SyncObj x = (SyncObj) ctx;
-
         synchronized (x) {
+            x.rc = rc;
             x.counter++;
             x.notify();
         }