You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/06/01 20:40:44 UTC

bookkeeper git commit: BOOKKEEPER-1086: Ledger Recovery - Refactor PendingReadOp

Repository: bookkeeper
Updated Branches:
  refs/heads/master a9fe7d94f -> 5c81acacc


BOOKKEEPER-1086: Ledger Recovery - Refactor PendingReadOp

this change is the first part of improving ledger recovery. it is basically a refactor change, which:

- abstract an interface for LedgerEntryRequest in PendingReadOp
- rename current implementation to SequenceReadRequest, which read the entry in the sequence of quorum.

Author: Sijie Guo <si...@twitter.com>
Author: Sijie Guo <si...@apache.org>

Reviewers: Enrico Olivelli <eo...@apache.org>

Closes #176 from sijie/recovery_improvements


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5c81acac
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5c81acac
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5c81acac

Branch: refs/heads/master
Commit: 5c81acaccfb0cb1260acdfd7d0bb95ae84f85654
Parents: a9fe7d9
Author: Sijie Guo <si...@twitter.com>
Authored: Thu Jun 1 22:39:28 2017 +0200
Committer: Enrico Olivelli <eo...@localhost.localdomain>
Committed: Thu Jun 1 22:39:28 2017 +0200

----------------------------------------------------------------------
 .../apache/bookkeeper/client/PendingReadOp.java | 191 ++++++++++++-------
 .../bookkeeper/client/TestSpeculativeRead.java  |   6 +-
 2 files changed, 130 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5c81acac/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index fd69849..14051b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -72,24 +72,135 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
 
     final int maxMissedReadsAllowed;
 
-    class LedgerEntryRequest extends LedgerEntry {
-        final static int NOT_FOUND = -1;
-        int nextReplicaIndexToReadFrom = 0;
-        AtomicBoolean complete = new AtomicBoolean(false);
+    abstract class LedgerEntryRequest extends LedgerEntry {
+
+        final AtomicBoolean complete = new AtomicBoolean(false);
 
         int firstError = BKException.Code.OK;
         int numMissedEntryReads = 0;
 
         final ArrayList<BookieSocketAddress> ensemble;
         final List<Integer> writeSet;
-        final BitSet sentReplicas;
-        final BitSet erroredReplicas;
 
         LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
             super(lId, eId);
 
             this.ensemble = ensemble;
             this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+        }
+
+        /**
+         * Execute the read request.
+         */
+        abstract void read();
+
+        /**
+         * Complete the read request from <i>host</i>.
+         *
+         * @param host
+         *          host that respond the read
+         * @param buffer
+         *          the data buffer
+         * @return return true if we managed to complete the entry;
+         *         otherwise return false if the read entry is not complete or it is already completed before
+         */
+        boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
+            ByteBuf content;
+            try {
+                content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+            } catch (BKDigestMatchException e) {
+                logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+                buffer.release();
+                return false;
+            }
+
+            if (!complete.getAndSet(true)) {
+                /*
+                 * The length is a long and it is the last field of the metadata of an entry.
+                 * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
+                 */
+                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
+                data = content;
+                return true;
+            } else {
+                buffer.release();
+                return false;
+            }
+        }
+
+        /**
+         * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
+         *
+         * @param host
+         *          host that just respond
+         * @param errMsg
+         *          error msg to log
+         * @param rc
+         *          read result code
+         */
+        void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
+            if (BKException.Code.OK == firstError ||
+                BKException.Code.NoSuchEntryException == firstError ||
+                BKException.Code.NoSuchLedgerExistsException == firstError) {
+                firstError = rc;
+            } else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
+                       BKException.Code.NoSuchEntryException != rc &&
+                       BKException.Code.NoSuchLedgerExistsException != rc) {
+                // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
+                // returned we need to update firstError to indicate that it might be a valid read but just
+                // failed.
+                firstError = rc;
+            }
+            if (BKException.Code.NoSuchEntryException == rc ||
+                BKException.Code.NoSuchLedgerExistsException == rc) {
+                ++numMissedEntryReads;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}",
+                        new Object[] { lh.ledgerId, entryId, host });
+                }
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
+                        new Object[]{lh.ledgerId, entryId, host});
+                }
+            }
+        }
+
+        /**
+         * Send to next replica speculatively, if required and possible.
+         * This returns the host we may have sent to for unit testing.
+         *
+         * @param heardFromHosts
+         *      the set of hosts that we already received responses.
+         * @return host we sent to if we sent. null otherwise.
+         */
+        abstract BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts);
+
+        /**
+         * Whether the read request completed.
+         *
+         * @return true if the read request is completed.
+         */
+        boolean isComplete() {
+            return complete.get();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("L%d-E%d", ledgerId, entryId);
+        }
+    }
+
+    class SequenceReadRequest extends LedgerEntryRequest {
+        final static int NOT_FOUND = -1;
+        int nextReplicaIndexToReadFrom = 0;
+
+        final BitSet sentReplicas;
+        final BitSet erroredReplicas;
+
+        SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+            super(ensemble, lId, eId);
+
             this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
             this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
         }
@@ -133,6 +244,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          * This returns the host we may have sent to for unit testing.
          * @return host we sent to if we sent. null otherwise.
          */
+        @Override
         synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
             if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 return null;
@@ -151,6 +263,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             }
         }
 
+        @Override
+        void read() {
+            sendNextRead();
+        }
+
         synchronized BookieSocketAddress sendNextRead() {
             if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just fail the
@@ -184,28 +301,9 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             }
         }
 
+        @Override
         synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
-            if (BKException.Code.OK == firstError ||
-                BKException.Code.NoSuchEntryException == firstError ||
-                BKException.Code.NoSuchLedgerExistsException == firstError) {
-                firstError = rc;
-            } else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
-                       BKException.Code.NoSuchEntryException != rc &&
-                       BKException.Code.NoSuchLedgerExistsException != rc) {
-                // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
-                // returned we need to update firstError to indicate that it might be a valid read but just
-                // failed.
-                firstError = rc;
-            }
-            if (BKException.Code.NoSuchEntryException == rc ||
-                BKException.Code.NoSuchLedgerExistsException == rc) {
-                ++numMissedEntryReads;
-                LOG.debug("No such entry found on bookie.  L{} E{} bookie: {}",
-                        new Object[] { lh.ledgerId, entryId, host });
-            } else {
-                LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
-                          new Object[] { lh.ledgerId, entryId, host });
-            }
+            super.logErrorAndReattemptRead(host, errMsg, rc);
 
             int replica = getReplicaIndex(host);
             if (replica == NOT_FOUND) {
@@ -218,41 +316,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                 sendNextRead();
             }
         }
-
-        // return true if we managed to complete the entry
-        // return false if the read entry is not complete or it is already completed before
-        boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
-            ByteBuf content;
-            try {
-                content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
-            } catch (BKDigestMatchException e) {
-                logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
-                buffer.release();
-                return false;
-            }
-
-            if (!complete.getAndSet(true)) {
-                /*
-                 * The length is a long and it is the last field of the metadata of an entry.
-                 * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
-                 */
-                length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
-                data = content;
-                return true;
-            } else {
-                buffer.release();
-                return false;
-            }
-        }
-
-        boolean isComplete() {
-            return complete.get();
-        }
-
-        @Override
-        public String toString() {
-            return String.format("L%d-E%d", ledgerId, entryId);
-        }
     }
 
     PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
@@ -325,11 +388,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                 ensemble = getLedgerMetadata().getEnsemble(i);
                 nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i);
             }
-            LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, lh.ledgerId, i);
+            LedgerEntryRequest entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
             seq.add(entry);
             i++;
 
-            entry.sendNextRead();
+            entry.read();
         } while (i <= endEntryId);
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5c81acac/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 2a6c71d..ee77d37 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -298,7 +298,7 @@ public class TestSpeculativeRead extends BaseTestCase {
 
             // if we've already heard from all hosts,
             // we only send the initial read
-            req0 = op.new LedgerEntryRequest(ensemble, l.getId(), 0);
+            req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0);
             assertTrue("Should have sent to first",
                        req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
             assertNull("Should not have sent another",
@@ -306,7 +306,7 @@ public class TestSpeculativeRead extends BaseTestCase {
 
             // if we have heard from some hosts, but not one we have sent to
             // send again
-            req2 = op.new LedgerEntryRequest(ensemble, l.getId(), 2);
+            req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2);
             assertTrue("Should have sent to third",
                        req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
             assertTrue("Should have sent to first",
@@ -314,7 +314,7 @@ public class TestSpeculativeRead extends BaseTestCase {
 
             // if we have heard from some hosts, which includes one we sent to
             // do not read again
-            req4 = op.new LedgerEntryRequest(ensemble, l.getId(), 4);
+            req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4);
             assertTrue("Should have sent to second",
                        req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
             assertNull("Should not have sent another",