You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/10/14 17:38:44 UTC

svn commit: r1531944 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java

Author: ivank
Date: Mon Oct 14 15:38:44 2013
New Revision: 1531944

URL: http://svn.apache.org/r1531944
Log:
BOOKKEEPER-676: Make add asynchrounous in ledger recovery (aniruddha via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1531944&r1=1531943&r2=1531944&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 14 15:38:44 2013
@@ -108,6 +108,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-638: Two bookies could start at the same time to access bookie data. (sijie via ivank)
 
+        BOOKKEEPER-676: Make add asynchrounous in ledger recovery (aniruddha via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=1531944&r1=1531943&r2=1531944&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Mon Oct 14 15:38:44 2013
@@ -20,6 +20,8 @@ package org.apache.bookkeeper.client;
 
 import java.util.Enumeration;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -41,11 +43,10 @@ import org.slf4j.LoggerFactory;
 class LedgerRecoveryOp implements ReadCallback, AddCallback {
     static final Logger LOG = LoggerFactory.getLogger(LedgerRecoveryOp.class);
     LedgerHandle lh;
-    int numResponsesPending;
-    boolean proceedingWithRecovery = false;
-    long maxAddPushed = LedgerHandle.INVALID_ENTRY_ID;
-    long maxAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;
-    long maxLength = 0;
+    AtomicLong readCount, writeCount;
+    AtomicBoolean readDone;
+    AtomicBoolean callbackDone;
+    long entryToRead;
     // keep a copy of metadata for recovery.
     LedgerMetadata metadataForRecovery;
 
@@ -66,9 +67,12 @@ class LedgerRecoveryOp implements ReadCa
     }
 
     public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
+        readCount = new AtomicLong(0);
+        writeCount = new AtomicLong(0);
+        readDone = new AtomicBoolean(false);
+        callbackDone = new AtomicBoolean(false);
         this.cb = cb;
         this.lh = lh;
-        numResponsesPending = lh.metadata.getEnsembleSize();
     }
 
     public void initiate() {
@@ -78,6 +82,7 @@ class LedgerRecoveryOp implements ReadCa
                         if (rc == BKException.Code.OK) {
                             lh.lastAddPushed = lh.lastAddConfirmed = data.lastAddConfirmed;
                             lh.length = data.length;
+                            entryToRead = lh.lastAddConfirmed;
                             // keep a copy of ledger metadata before proceeding
                             // ledger recovery
                             metadataForRecovery = new LedgerMetadata(lh.getLedgerMetadata());
@@ -102,17 +107,38 @@ class LedgerRecoveryOp implements ReadCa
      * Try to read past the last confirmed.
      */
     private void doRecoveryRead() {
-        long nextEntry = lh.lastAddConfirmed + 1;
-        try {
-            new RecoveryReadOp(lh, lh.bk.scheduler, nextEntry, nextEntry, this, null).initiate();
-        } catch (InterruptedException e) {
-            readComplete(BKException.Code.InterruptedException, lh, null, null);
+        if (!callbackDone.get()) {
+            entryToRead++;
+            try {
+                new RecoveryReadOp(lh, lh.bk.scheduler, entryToRead, entryToRead, this, null).initiate();
+            } catch (InterruptedException e) {
+                readComplete(BKException.Code.InterruptedException, lh, null, null);
+            }
+        }
+    }
+
+    private void closeAndCallback() {
+        if (callbackDone.compareAndSet(false, true)) {
+            lh.asyncCloseInternal(new CloseCallback() {
+                @Override
+                public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+                    if (rc != BKException.Code.OK) {
+                        LOG.warn("Close ledger {} failed during recovery: ",
+                            LedgerRecoveryOp.this.lh.getId(), BKException.getMessage(rc));
+                        cb.operationComplete(rc, null);
+                    } else {
+                        cb.operationComplete(BKException.Code.OK, null);
+                        LOG.debug("After closing length is: {}", lh.getLength());
+                    }
+                }
+            }, null, BKException.Code.LedgerClosedException);
         }
     }
 
     @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
         if (rc == BKException.Code.OK) {
+            readCount.incrementAndGet();
             LedgerEntry entry = seq.nextElement();
             byte[] data = entry.getEntry();
 
@@ -125,27 +151,20 @@ class LedgerRecoveryOp implements ReadCa
                 lh.length = entry.getLength() - (long) data.length;
             }
             lh.asyncRecoveryAddEntry(data, 0, data.length, this, null);
+            doRecoveryRead();
             return;
         }
 
         if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
-            lh.asyncCloseInternal(new CloseCallback() {
-                @Override
-                public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
-                    if (rc != BKException.Code.OK) {
-                        LOG.warn("Close failed: " + BKException.getMessage(rc));
-                        cb.operationComplete(rc, null);
-                    } else {
-                        cb.operationComplete(BKException.Code.OK, null);
-                        LOG.debug("After closing length is: {}", lh.getLength());
-                    }
-                }
-                }, null, BKException.Code.LedgerClosedException);
+            readDone.set(true);
+            if (readCount.get() == writeCount.get()) {
+                closeAndCallback();
+            }
             return;
         }
 
         // otherwise, some other error, we can't handle
-        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + (lh.lastAddConfirmed + 1)
+        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + entryToRead
                   + " ledger: " + lh.ledgerId + " while recovering ledger");
         cb.operationComplete(rc, null);
         return;
@@ -154,14 +173,18 @@ class LedgerRecoveryOp implements ReadCa
     @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         if (rc != BKException.Code.OK) {
-            // Give up, we can't recover from this error
-
             LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + (lh.lastAddConfirmed + 1)
                       + " ledger: " + lh.ledgerId + " while recovering ledger");
-            cb.operationComplete(rc, null);
+            if (callbackDone.compareAndSet(false, true)) {
+                // Give up, we can't recover from this error
+                cb.operationComplete(rc, null);
+            }
             return;
         }
-        doRecoveryRead();
+        long numAdd = writeCount.incrementAndGet();
+        if (readDone.get() && readCount.get() == numAdd) {
+            closeAndCallback();
+        }
     }
 
 }