You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/03/31 10:58:52 UTC

svn commit: r1307743 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: sijie
Date: Sat Mar 31 08:58:51 2012
New Revision: 1307743

URL: http://svn.apache.org/viewvc?rev=1307743&view=rev
Log:
BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHandle#close on that ledger to fail (sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Mar 31 08:58:51 2012
@@ -74,6 +74,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-198: replaying entries of deleted ledgers would exhaust ledger cache. (sijie)
 
+        BOOKKEEPER-112: Bookie Recovery on an open ledger will cause LedgerHandle#close on that ledger to fail (sijie)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Sat Mar 31 08:58:51 2012
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.Asyn
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -83,7 +84,6 @@ public class BookKeeperAdmin {
     private DigestType DIGEST_TYPE;
     private byte[] PASSWD;
 
-    
     /**
      * Constructor that takes in a ZooKeeper servers connect string so we know
      * how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -426,8 +426,8 @@ public class BookKeeperAdmin {
          * ledger fragments are stored on. Check if any of the ledger fragments
          * for the current ledger are stored on the input dead bookie.
          */
-        DigestType digestType = getLedgerDigestType(lId);
-        byte[] passwd = getLedgerPasswd(lId);
+        final DigestType digestType = getLedgerDigestType(lId);
+        final byte[] passwd = getLedgerPasswd(lId);
         bkc.asyncOpenLedgerNoRecovery(lId, digestType, passwd, new OpenCallback() {
             @Override
             public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
@@ -436,6 +436,39 @@ public class BookKeeperAdmin {
                     ledgerIterCb.processResult(rc, null, null);
                     return;
                 }
+
+                LedgerMetadata lm = lh.getLedgerMetadata();
+                if (!lm.isClosed() &&
+                    lm.getEnsembles().size() > 0) {
+                    Long lastKey = lm.getEnsembles().lastKey();
+                    ArrayList<InetSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+                    // the original write has not removed faulty bookie from
+                    // current ledger ensemble. to avoid data loss issue in
+                    // the case of concurrent updates to the ensemble composition,
+                    // the recovery tool should first close the ledger
+                    if (lastEnsemble.contains(bookieSrc)) {
+                        // close opened non recovery ledger handle
+                        try {
+                            lh.close();
+                        } catch (Exception ie) {
+                            LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+                        }
+                        bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() {
+                            @Override
+                            public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+                                if (newrc != Code.OK.intValue()) {
+                                    LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+                                    ledgerIterCb.processResult(newrc, null, null);
+                                    return;
+                                }
+                                // do recovery
+                                recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
+                            }
+                        }, null);
+                        return;
+                    }
+                }
+
                 /*
                  * This List stores the ledger fragments to recover indexed by
                  * the start entry ID for the range. The ensembles TreeMap is
@@ -465,6 +498,12 @@ public class BookKeeperAdmin {
                         ledgerFragmentsToRecover.add(entry.getKey());
                     }
                 }
+                // add last ensemble otherwise if the failed bookie existed in
+                // the last ensemble of a closed ledger. the entries belonged to
+                // last ensemble would not be replicated.
+                if (curEntryId != null) {
+                    ledgerFragmentsRange.put(curEntryId, lh.getLastAddConfirmed());
+                }
                 /*
                  * See if this current ledger contains any ledger fragment that
                  * needs to be re-replicated. If not, then just invoke the
@@ -503,7 +542,6 @@ public class BookKeeperAdmin {
                                   + "," + endEntryId + "] of ledger " + lh.getId()
                                   + " to " + newBookie);
                     }
-
                     try {
                         SingleFragmentCallback cb = new SingleFragmentCallback(
                                                                                ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
@@ -551,6 +589,12 @@ public class BookKeeperAdmin {
             cb.processResult(BKException.Code.OK, null, null);
             return;
         }
+        if (startEntryId > endEntryId) {
+            // for open ledger which there is no entry, the start entry id is 0, the end entry id is -1.
+            // we can return immediately to trigger forward read
+            cb.processResult(BKException.Code.OK, null, null);
+            return;
+        }
 
         ArrayList<InetSocketAddress> curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
         int bookieIndex = 0;
@@ -603,7 +647,7 @@ public class BookKeeperAdmin {
      *            entries that were stored on the failed bookie.
      */
     private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh,
-                                            final MultiCallback ledgerFragmentEntryMcb, 
+                                            final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
                                             final InetSocketAddress newBookie) throws InterruptedException {
         /*
          * Read the ledger entry using the LedgerHandle. This will allow us to
@@ -659,13 +703,14 @@ public class BookKeeperAdmin {
      * be a multicallback responsible for all fragments in a single ledger
      */
     class SingleFragmentCallback implements AsyncCallback.VoidCallback {
-        final MultiCallback ledgerFragmentsMcb;
+        final AsyncCallback.VoidCallback ledgerFragmentsMcb;
         final LedgerHandle lh;
         final long fragmentStartId;
         final InetSocketAddress oldBookie;
         final InetSocketAddress newBookie;
 
-        SingleFragmentCallback(MultiCallback ledgerFragmentsMcb, LedgerHandle lh, 
+        SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
+                               LedgerHandle lh,
                                long fragmentStartId,
                                InetSocketAddress oldBookie,
                                InetSocketAddress newBookie) {
@@ -684,6 +729,10 @@ public class BookKeeperAdmin {
                 ledgerFragmentsMcb.processResult(rc, null, null);
                 return;
             }
+            writeLedgerConfig();
+        }
+
+        protected void writeLedgerConfig() {
             /*
              * Update the ledger metadata's ensemble info to point
              * to the new bookie.
@@ -693,20 +742,32 @@ public class BookKeeperAdmin {
             int deadBookieIndex = ensemble.indexOf(oldBookie);
             ensemble.remove(deadBookieIndex);
             ensemble.add(deadBookieIndex, newBookie);
-            
-            
+
             lh.writeLedgerConfig(new WriteCb(), null);
         }
         
         private class WriteCb implements AsyncCallback.StatCallback {
             @Override
-            public void processResult(int rc, String path, Object ctx, Stat stat) {
+            public void processResult(int rc, final String path, Object ctx, Stat stat) {
                 if (rc == Code.BADVERSION.intValue()) {
                     LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId() 
                              + " startid: " + fragmentStartId);
                     // try again, the previous success (with which this has conflicted)
                     // will have updated the stat
-                    lh.writeLedgerConfig(new WriteCb(), null);
+                    // other operations such as (addEnsemble) would update it too.
+                    lh.rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                        @Override
+                        public void operationComplete(int rc, LedgerMetadata newMeta) {
+                            if (rc != BKException.Code.OK) {
+                                LOG.error("Error reading updated ledger metadata for ledger " + lh.getId(),
+                                          KeeperException.create(KeeperException.Code.get(rc), path));
+                                ledgerFragmentsMcb.processResult(rc, null, null);
+                            } else {
+                                lh.metadata = newMeta;
+                                writeLedgerConfig();
+                            }
+                        }
+                    });
                     return;
                 } else if (rc != Code.OK.intValue()) {
                     LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
@@ -725,4 +786,5 @@ public class BookKeeperAdmin {
             }
         };
     }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Sat Mar 31 08:58:51 2012
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
@@ -253,7 +254,13 @@ public class LedgerHandle {
         bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
             @Override
             public void safeRun() {
+                final long prevClose;
+                final long prevLength;
+
                 synchronized(LedgerHandle.this) {
+                    prevClose = metadata.close;
+                    prevLength = metadata.length;
+
                     // synchronized on LedgerHandle.this to ensure that 
                     // lastAddPushed can not be updated after the metadata 
                     // is closed. 
@@ -261,7 +268,6 @@ public class LedgerHandle {
 
                     // Close operation is idempotent, so no need to check if we are
                     // already closed
-
                     metadata.close(lastAddConfirmed);
                     errorOutPendingAdds(rc);
                     lastAddPushed = lastAddConfirmed;
@@ -272,11 +278,37 @@ public class LedgerHandle {
                               + metadata.close + " with this many bytes: " + metadata.length);
                 }
 
-                writeLedgerConfig(new StatCallback() {
+                final class CloseCb implements StatCallback {
                     @Override
-                    public void processResult(int rc, String path, Object subctx,
-                                              Stat stat) {
-                        if (rc != KeeperException.Code.OK.intValue()) {
+                    public void processResult(final int rc, String path, Object subctx,
+                                              final Stat stat) {
+                        if (rc == KeeperException.Code.BadVersion) {
+                            rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                                @Override
+                                public void operationComplete(int newrc, LedgerMetadata newMeta) {
+                                    if (newrc != BKException.Code.OK) {
+                                        LOG.error("Error reading new metadata from ledger " + ledgerId
+                                                  + " when closing, code=" + newrc);
+                                        cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+                                    } else {
+                                        metadata.close(prevClose);
+                                        metadata.length = prevLength;
+                                        if (metadata.resolveConflict(newMeta)) {
+                                            metadata.length = length;
+                                            metadata.close(lastAddConfirmed);
+                                            writeLedgerConfig(new CloseCb(), null);
+                                            return;
+                                        } else {
+                                            metadata.length = length;
+                                            metadata.close(lastAddConfirmed);
+                                            LOG.warn("Conditional write failed: "
+                                                     + KeeperException.Code.get(KeeperException.Code.BadVersion));
+                                            cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this, ctx);
+                                        }
+                                    }
+                                }
+                            });
+                        } else if (rc != KeeperException.Code.OK.intValue()) {
                             LOG.warn("Conditional write failed: " + KeeperException.Code.get(rc));
                             cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
                                              ctx);
@@ -285,7 +317,9 @@ public class LedgerHandle {
                             cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
                         }
                     }
-                }, null);
+                };
+
+                writeLedgerConfig(new CloseCb(), null);
 
             }
         });
@@ -639,16 +673,43 @@ public class LedgerHandle {
                       + (lastAddConfirmed + 1));
         }
 
-        metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
+        final long newEnsembleStartEntry = lastAddConfirmed + 1;
+        metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
 
-        writeLedgerConfig(new StatCallback() {
+        final class ChangeEnsembleCb implements StatCallback {
             @Override
             public void processResult(final int rc, String path, Object ctx, final Stat stat) {
 
                 bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
                     @Override
                     public void safeRun() {
-                        if (rc != KeeperException.Code.OK.intValue()) {
+                        if (rc == KeeperException.Code.BadVersion) {
+                            rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                                @Override
+                                public void operationComplete(int newrc, LedgerMetadata newMeta) {
+                                    if (newrc != BKException.Code.OK) {
+                                        LOG.error("Error reading new metadata from ledger after changing ensemble, code=" + newrc);
+                                        handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                                    } else {
+                                        // a new ensemble is added only when the start entry is larger than zero
+                                        if (newEnsembleStartEntry > 0) {
+                                            metadata.getEnsembles().remove(newEnsembleStartEntry);
+                                        }
+                                        if (metadata.resolveConflict(newMeta)) {
+                                            metadata.addEnsemble(newEnsembleStartEntry, newEnsemble);
+                                            writeLedgerConfig(new ChangeEnsembleCb(), null);
+                                            return;
+                                        } else {
+                                            LOG.error("Could not resolve ledger metadata confliction while changing ensemble to: "
+                                                      + newEnsemble + ", old meta data is \n" + new String(metadata.serialize())
+                                                      + "\n, new meta data is \n" + new String(newMeta.serialize()) + "\n ,closing ledger");
+                                            handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+                                        }
+                                    }
+                                }
+                            });
+                            return;
+                        } else if (rc != KeeperException.Code.OK.intValue()) {
                             LOG
                             .error("Could not persist ledger metadata while changing ensemble to: "
                                    + newEnsemble + " , closing ledger");
@@ -664,30 +725,33 @@ public class LedgerHandle {
                 });
 
             }
-        }, null);
+        };
+
+        writeLedgerConfig(new ChangeEnsembleCb(), null);
 
     }
 
-    void rereadMetadata(final GenericCallback<Void> cb) {
+    void rereadMetadata(final GenericCallback<LedgerMetadata> cb) {
         bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
-                new DataCallback() {
-                    public void processResult(int rc, String path,
-                                              Object ctx, byte[] data, Stat stat) {
-                        if (rc != KeeperException.Code.OK.intValue()) {
-                            LOG.error("Error reading metadata from ledger, code =" + rc);
-                            cb.operationComplete(BKException.Code.ZKException, null);
-                            return;
-                        }
-                        
-                        try {
-                            metadata = LedgerMetadata.parseConfig(data, stat.getVersion());
-                        } catch (IOException e) {
-                            LOG.error("Error parsing ledger metadata for ledger", e);
-                            cb.operationComplete(BKException.Code.ZKException, null);
-                        }
-                        cb.operationComplete(BKException.Code.OK, null);
+            new DataCallback() {
+                public void processResult(int rc, String path,
+                                          Object ctx, byte[] data, Stat stat) {
+                    if (rc != KeeperException.Code.OK.intValue()) {
+                        LOG.error("Error reading metadata from ledger, code =" + rc);
+                        cb.operationComplete(BKException.Code.ZKException, null);
+                        return;
                     }
-                }, null);
+
+                    try {
+                        LedgerMetadata newMeta = LedgerMetadata.parseConfig(data, stat.getVersion());
+                        cb.operationComplete(BKException.Code.OK, newMeta);
+                    } catch (IOException e) {
+                        LOG.error("Error parsing ledger metadata for ledger", e);
+                        cb.operationComplete(BKException.Code.ZKException, null);
+                        return;
+                    }
+                }
+        }, null);
     }
 
     void recover(final GenericCallback<Void> cb) {
@@ -713,12 +777,13 @@ public class LedgerHandle {
             @Override
             public void processResult(final int rc, String path, Object ctx, Stat stat) {
                 if (rc == KeeperException.Code.BadVersion) {
-                    rereadMetadata(new GenericCallback<Void>() {
+                    rereadMetadata(new GenericCallback<LedgerMetadata>() {
                             @Override
-                            public void operationComplete(int rc, Void result) {
+                            public void operationComplete(int rc, LedgerMetadata newMeta) {
                                 if (rc != BKException.Code.OK) {
                                     cb.operationComplete(rc, null);
                                 } else {
+                                    metadata = newMeta;
                                     recover(cb);
                                 }
                             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Sat Mar 31 08:58:51 2012
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -259,4 +260,47 @@ public class LedgerMetadata {
     public int getZnodeVersion() {
         return this.znodeVersion;
     }
+
+    /**
+     * Resolve conflict with new updated metadata.
+     *
+     * @param newMeta
+     *          Re-read metadata
+     * @return true if the confliction is resolved, otherwise false.
+     */
+    boolean resolveConflict(LedgerMetadata newMeta) {
+        // length & close is changed means other one open the ledger
+        // can't resolve this confliction
+        if (metadataFormatVersion != newMeta.metadataFormatVersion ||
+            ensembleSize != newMeta.ensembleSize ||
+            quorumSize != newMeta.quorumSize ||
+            length != newMeta.length ||
+            close != newMeta.close) {
+            return false;
+        }
+        // new meta znode version should be larger than old one
+        if (znodeVersion > newMeta.znodeVersion) {
+            return false;
+        }
+        // ensemble size should be same
+        if (ensembles.size() != newMeta.ensembles.size()) {
+            return false;
+        }
+        // ensemble distribution should be same
+        // we don't check the detail ensemble, since new bookie will be set
+        // using recovery tool.
+        Iterator<Long> keyIter = ensembles.keySet().iterator();
+        Iterator<Long> newMetaKeyIter = newMeta.ensembles.keySet().iterator();
+        for (int i=0; i<ensembles.size(); i++) {
+            Long curKey = keyIter.next();
+            Long newMetaKey = newMetaKeyIter.next();
+            if (curKey != newMetaKey) {
+                return false;
+            }
+        }
+        // if the confliction could be resolved, update ensembles and znode version
+        ensembles = newMeta.ensembles;
+        znodeVersion = newMeta.znodeVersion;
+        return true;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Sat Mar 31 08:58:51 2012
@@ -126,7 +126,7 @@ class LedgerRecoveryOp implements ReadCa
         }
 
         // otherwise, some other error, we can't handle
-        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + lh.lastAddConfirmed + 1
+        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + (lh.lastAddConfirmed + 1)
                   + " ledger: " + lh.ledgerId + " while recovering ledger");
         cb.operationComplete(rc, null);
         return;
@@ -137,7 +137,7 @@ class LedgerRecoveryOp implements ReadCa
         if (rc != BKException.Code.OK) {
             // Give up, we can't recover from this error
 
-            LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + lh.lastAddConfirmed + 1
+            LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
                       + " ledger: " + lh.ledgerId + " while recovering ledger");
             cb.operationComplete(rc, null);
             return;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Sat Mar 31 08:58:51 2012
@@ -121,6 +121,21 @@ class PendingReadOp implements Enumerati
     public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
         final LedgerEntry entry = (LedgerEntry) ctx;
 
+        // if we just read only one entry, and this entry is not existed (in recoveryRead case)
+        // we don't need to do ReattemptRead, otherwise we could not handle following case:
+        //
+        // an empty ledger with quorum (bk1, bk2), bk2 is failed forever.
+        // bk1 return NoLedgerException, client do ReattemptRead to bk2 but bk2 isn't connected
+        // so the read 0 entry would failed. this ledger could never be closed.
+        if (startEntryId == endEntryId) {
+            if (BKException.Code.NoSuchLedgerExistsException == rc ||
+                BKException.Code.NoSuchEntryException == rc) {
+                lh.opCounterSem.release();
+                submitCallback(rc);
+                return;
+            }
+        }
+
         if (rc != BKException.Code.OK) {
             logErrorAndReattemptRead(entry, "Error: " + BKException.getMessage(rc), rc);
             return;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Sat Mar 31 08:58:51 2012
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Collections;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.test.BaseTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -159,6 +161,15 @@ public class BookieRecoveryTest extends 
         return lhs;
     }
 
+    private List<LedgerHandle> openLedgers(List<LedgerHandle> oldLhs)
+            throws Exception {
+        List<LedgerHandle> newLhs = new ArrayList<LedgerHandle>();
+        for (LedgerHandle oldLh : oldLhs) {
+            newLhs.add(bkc.openLedger(oldLh.getId(), digestType, baseClientConf.getBookieRecoveryPasswd()));
+        }
+        return newLhs;
+    }
+
     /**
      * Helper method to write dummy ledger entries to all of the ledgers passed.
      *
@@ -171,8 +182,9 @@ public class BookieRecoveryTest extends 
      * @throws BKException
      * @throws InterruptedException
      */
-    private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
-        InterruptedException {
+    private void writeEntriestoLedgers(int numEntries, long startEntryId,
+                                       List<LedgerHandle> lhs)
+        throws BKException, InterruptedException {
         for (LedgerHandle lh : lhs) {
             for (int i = 0; i < numEntries; i++) {
                 lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
@@ -180,6 +192,12 @@ public class BookieRecoveryTest extends 
         }
     }
 
+    private void closeLedgers(List<LedgerHandle> lhs) throws BKException, InterruptedException {
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+    }
+
     /**
      * Helper method to verify that we can read the recovered ledger entries.
      *
@@ -507,6 +525,140 @@ public class BookieRecoveryTest extends 
         return numDupes > 0;
     }
 
+    /**
+     * Test recoverying the closed ledgers when the failed bookie server is in the last ensemble
+     */
+    @Test
+    public void testBookieRecoveryOnClosedLedgers() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+        // Write the entries for the ledgers with dummy values
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        closeLedgers(lhs);
+
+        // Shutdown last bookie server in last ensemble
+        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+                                                       .entrySet().iterator().next().getValue();
+        InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+        killBookie(bookieToKill);
+
+        // start a new bookie
+        startNewBookie();
+
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieToKill
+               + ") and replicate it to a random available one");
+
+        bkAdmin.recoverBookieData(bookieToKill, bookieDest);
+        for (LedgerHandle lh : lhs) {
+            assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+            lh.close();
+        }
+    }
+
+    @Test
+    public void testBookieRecoveryOnOpenedLedgers() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+        // Write the entries for the ledgers with dummy values
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+                                                       .entrySet().iterator().next().getValue();
+        InetSocketAddress bookieToKill = lastEnsemble.get(lastEnsemble.size() - 1);
+        killBookie(bookieToKill);
+
+        // start a new bookie
+        startNewBookie();
+
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieToKill
+               + ") and replicate it to a random available one");
+
+        bkAdmin.recoverBookieData(bookieToKill, bookieDest);
+
+        for (LedgerHandle lh : lhs) {
+            assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+        }
+
+        try {
+            // we can't write entries
+            writeEntriestoLedgers(numMsgs, 0, lhs);
+            fail("should not reach here");
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testBookieRecoveryOnInRecoveryLedger() throws Exception {
+        int numMsgs = 10;
+        // Create the ledgers
+        int numLedgers = 1;
+        List<LedgerHandle> lhs = createLedgers(numLedgers, 2, 2);
+
+        // Write the entries for the ledgers with dummy values
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        ArrayList<InetSocketAddress> lastEnsemble = lhs.get(0).getLedgerMetadata().getEnsembles()
+                                                       .entrySet().iterator().next().getValue();
+        // removed bookie
+        InetSocketAddress bookieToKill = lastEnsemble.get(0);
+        killBookie(bookieToKill);
+        // temp failure
+        InetSocketAddress bookieToKill2 = lastEnsemble.get(1);
+        ServerConfiguration conf2 = killBookie(bookieToKill2);
+
+        // start a new bookie
+        startNewBookie();
+
+        // open these ledgers
+        for (LedgerHandle oldLh : lhs) {
+            try {
+                bkc.openLedger(oldLh.getId(), digestType, baseClientConf.getBookieRecoveryPasswd());
+                fail("Should have thrown exception");
+            } catch (Exception e) {
+            }
+        }
+
+        try {
+            bkAdmin.recoverBookieData(bookieToKill, null);
+            fail("Should have thrown exception");
+        } catch (BKException.BKLedgerRecoveryException bke) {
+            // correct behaviour
+        }
+
+        // restart failed bookie
+        bs.add(startBookie(conf2));
+        bsConfs.add(conf2);
+
+        // recover them
+        bkAdmin.recoverBookieData(bookieToKill, null);
+
+        for (LedgerHandle lh : lhs) {
+            assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs));
+        }
+
+        // open ledgers to read metadata
+        List<LedgerHandle> newLhs = openLedgers(lhs);
+        for (LedgerHandle newLh : newLhs) {
+            // first ensemble should contains bookieToKill2 and not contain bookieToKill
+            Map.Entry<Long, ArrayList<InetSocketAddress>> entry =
+                newLh.getLedgerMetadata().getEnsembles().entrySet().iterator().next();
+            assertFalse(entry.getValue().contains(bookieToKill));
+            assertTrue(entry.getValue().contains(bookieToKill2));
+        }
+
+    }
+
     @Test
     public void testAsyncBookieRecoveryToRandomBookiesNotEnoughBookies() throws Exception {
         // Create the ledgers
@@ -575,9 +727,7 @@ public class BookieRecoveryTest extends 
             writeEntriestoLedgers(numMsgs, numMsgs*2, lhs);
             for (LedgerHandle lh : lhs) {
                 assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs*3));
-                // TODO (BOOKKEEPER-112) this throws an exception at the moment 
-                // because recovering a ledger updates the ledger znode
-                //lh.close();
+                lh.close();
             }
         }
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java Sat Mar 31 08:58:51 2012
@@ -269,16 +269,18 @@ public class TestFencing extends BaseTes
         InetSocketAddress bookieToKill 
             = writelh.getLedgerMetadata().getEnsemble(numEntries).get(0);
         killBookie(bookieToKill);
+
+        // write entries to change ensemble
+        for (int i = 0; i < numEntries; i++) {
+            writelh.addEntry(tmp.getBytes());
+        }
+
         admin.recoverBookieData(bookieToKill, null);
         
-        /* TODO: uncomment this when BOOKKEEPER-112 is
-           fixed
-           
         for (int i = 0; i < numEntries; i++) {
             writelh.addEntry(tmp.getBytes());
         }
-        */
-        
+
         LedgerHandle readlh = bkc.openLedger(writelh.getId(), 
                                              digestType, "testPasswd".getBytes());
         try {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Sat Mar 31 08:58:51 2012
@@ -169,9 +169,10 @@ public abstract class BookKeeperClusterT
      *
      * @param addr
      *          Socket Address
+     * @return the configuration of killed bookie
      * @throws InterruptedException
      */
-    public void killBookie(InetSocketAddress addr) throws InterruptedException {
+    public ServerConfiguration killBookie(InetSocketAddress addr) throws InterruptedException {
         BookieServer toRemove = null;
         int toRemoveIndex = 0;
         for (BookieServer server : bs) {
@@ -184,8 +185,9 @@ public abstract class BookKeeperClusterT
         }
         if (toRemove != null) {
             bs.remove(toRemove);
-            bsConfs.remove(toRemoveIndex);
+            return bsConfs.remove(toRemoveIndex);
         }
+        return null;
     }
 
     /**
@@ -193,17 +195,18 @@ public abstract class BookKeeperClusterT
      *
      * @param index
      *          Bookie Index
+     * @return the configuration of killed bookie
      * @throws InterruptedException
      * @throws IOException
      */
-    public void killBookie(int index) throws InterruptedException, IOException {
+    public ServerConfiguration killBookie(int index) throws InterruptedException, IOException {
         if (index >= bs.size()) {
             throw new IOException("Bookie does not exist");
         }
         BookieServer server = bs.get(index);
         server.shutdown();
         bs.remove(server);
-        bsConfs.remove(index);
+        return bsConfs.remove(index);
     }
 
     /**
@@ -318,7 +321,7 @@ public abstract class BookKeeperClusterT
      *            Server Configuration Object
      *
      */
-    private BookieServer startBookie(ServerConfiguration conf)
+    protected BookieServer startBookie(ServerConfiguration conf)
             throws IOException, InterruptedException, KeeperException, BookieException {
         BookieServer server = new BookieServer(conf);
         server.start();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java?rev=1307743&r1=1307742&r2=1307743&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/CloseTest.java Sat Mar 31 08:58:51 2012
@@ -72,4 +72,29 @@ public class CloseTest extends BaseTestC
             lh[i].close();
         }
     }
+
+    @Test
+    public void testCloseByOthers() throws Exception {
+
+        int numLedgers = 1;
+        int numMsgs = 10;
+
+        LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+        String tmp = "BookKeeper is cool!";
+
+        /*
+         * Write 10 entries to lh.
+         */
+        for (int i = 0; i < numMsgs; i++) {
+            lh.addEntry(tmp.getBytes());
+        }
+
+        // other one close the entries
+        LedgerHandle lh2 = bkc.openLedger(lh.getId(), digestType, "".getBytes());
+
+        // so the ledger would be closed, the metadata is changed
+        // the original ledger handle should be able to close it successfully
+        lh2.close();
+    }
 }