You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/06/01 20:40:44 UTC
bookkeeper git commit: BOOKKEEPER-1086: Ledger Recovery - Refactor
PendingReadOp
Repository: bookkeeper
Updated Branches:
refs/heads/master a9fe7d94f -> 5c81acacc
BOOKKEEPER-1086: Ledger Recovery - Refactor PendingReadOp
this change is the first part of improving ledger recovery. it is basically a refactor change, which:
- abstract an interface for LedgerEntryRequest in PendingReadOp
- rename current implementation to SequenceReadRequest, which read the entry in the sequence of quorum.
Author: Sijie Guo <si...@twitter.com>
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@apache.org>
Closes #176 from sijie/recovery_improvements
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5c81acac
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5c81acac
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5c81acac
Branch: refs/heads/master
Commit: 5c81acaccfb0cb1260acdfd7d0bb95ae84f85654
Parents: a9fe7d9
Author: Sijie Guo <si...@twitter.com>
Authored: Thu Jun 1 22:39:28 2017 +0200
Committer: Enrico Olivelli <eo...@localhost.localdomain>
Committed: Thu Jun 1 22:39:28 2017 +0200
----------------------------------------------------------------------
.../apache/bookkeeper/client/PendingReadOp.java | 191 ++++++++++++-------
.../bookkeeper/client/TestSpeculativeRead.java | 6 +-
2 files changed, 130 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5c81acac/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index fd69849..14051b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -72,24 +72,135 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
final int maxMissedReadsAllowed;
- class LedgerEntryRequest extends LedgerEntry {
- final static int NOT_FOUND = -1;
- int nextReplicaIndexToReadFrom = 0;
- AtomicBoolean complete = new AtomicBoolean(false);
+ abstract class LedgerEntryRequest extends LedgerEntry {
+
+ final AtomicBoolean complete = new AtomicBoolean(false);
int firstError = BKException.Code.OK;
int numMissedEntryReads = 0;
final ArrayList<BookieSocketAddress> ensemble;
final List<Integer> writeSet;
- final BitSet sentReplicas;
- final BitSet erroredReplicas;
LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ }
+
+ /**
+ * Execute the read request.
+ */
+ abstract void read();
+
+ /**
+ * Complete the read request from <i>host</i>.
+ *
+ * @param host
+ * host that respond the read
+ * @param buffer
+ * the data buffer
+ * @return return true if we managed to complete the entry;
+ * otherwise return false if the read entry is not complete or it is already completed before
+ */
+ boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
+ ByteBuf content;
+ try {
+ content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+ } catch (BKDigestMatchException e) {
+ logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+ buffer.release();
+ return false;
+ }
+
+ if (!complete.getAndSet(true)) {
+ /*
+ * The length is a long and it is the last field of the metadata of an entry.
+ * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
+ */
+ length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
+ data = content;
+ return true;
+ } else {
+ buffer.release();
+ return false;
+ }
+ }
+
+ /**
+ * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
+ *
+ * @param host
+ * host that just respond
+ * @param errMsg
+ * error msg to log
+ * @param rc
+ * read result code
+ */
+ void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
+ if (BKException.Code.OK == firstError ||
+ BKException.Code.NoSuchEntryException == firstError ||
+ BKException.Code.NoSuchLedgerExistsException == firstError) {
+ firstError = rc;
+ } else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
+ BKException.Code.NoSuchEntryException != rc &&
+ BKException.Code.NoSuchLedgerExistsException != rc) {
+ // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
+ // returned we need to update firstError to indicate that it might be a valid read but just
+ // failed.
+ firstError = rc;
+ }
+ if (BKException.Code.NoSuchEntryException == rc ||
+ BKException.Code.NoSuchLedgerExistsException == rc) {
+ ++numMissedEntryReads;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No such entry found on bookie. L{} E{} bookie: {}",
+ new Object[] { lh.ledgerId, entryId, host });
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
+ new Object[]{lh.ledgerId, entryId, host});
+ }
+ }
+ }
+
+ /**
+ * Send to next replica speculatively, if required and possible.
+ * This returns the host we may have sent to for unit testing.
+ *
+ * @param heardFromHosts
+ * the set of hosts that we already received responses.
+ * @return host we sent to if we sent. null otherwise.
+ */
+ abstract BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts);
+
+ /**
+ * Whether the read request completed.
+ *
+ * @return true if the read request is completed.
+ */
+ boolean isComplete() {
+ return complete.get();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("L%d-E%d", ledgerId, entryId);
+ }
+ }
+
+ class SequenceReadRequest extends LedgerEntryRequest {
+ final static int NOT_FOUND = -1;
+ int nextReplicaIndexToReadFrom = 0;
+
+ final BitSet sentReplicas;
+ final BitSet erroredReplicas;
+
+ SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) {
+ super(ensemble, lId, eId);
+
this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
}
@@ -133,6 +244,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
* This returns the host we may have sent to for unit testing.
* @return host we sent to if we sent. null otherwise.
*/
+ @Override
synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
return null;
@@ -151,6 +263,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
}
+ @Override
+ void read() {
+ sendNextRead();
+ }
+
synchronized BookieSocketAddress sendNextRead() {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
@@ -184,28 +301,9 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
}
+ @Override
synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
- if (BKException.Code.OK == firstError ||
- BKException.Code.NoSuchEntryException == firstError ||
- BKException.Code.NoSuchLedgerExistsException == firstError) {
- firstError = rc;
- } else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
- BKException.Code.NoSuchEntryException != rc &&
- BKException.Code.NoSuchLedgerExistsException != rc) {
- // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
- // returned we need to update firstError to indicate that it might be a valid read but just
- // failed.
- firstError = rc;
- }
- if (BKException.Code.NoSuchEntryException == rc ||
- BKException.Code.NoSuchLedgerExistsException == rc) {
- ++numMissedEntryReads;
- LOG.debug("No such entry found on bookie. L{} E{} bookie: {}",
- new Object[] { lh.ledgerId, entryId, host });
- } else {
- LOG.debug(errMsg + " while reading L{} E{} from bookie: {}",
- new Object[] { lh.ledgerId, entryId, host });
- }
+ super.logErrorAndReattemptRead(host, errMsg, rc);
int replica = getReplicaIndex(host);
if (replica == NOT_FOUND) {
@@ -218,41 +316,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
sendNextRead();
}
}
-
- // return true if we managed to complete the entry
- // return false if the read entry is not complete or it is already completed before
- boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
- ByteBuf content;
- try {
- content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
- } catch (BKDigestMatchException e) {
- logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
- buffer.release();
- return false;
- }
-
- if (!complete.getAndSet(true)) {
- /*
- * The length is a long and it is the last field of the metadata of an entry.
- * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
- */
- length = buffer.getLong(DigestManager.METADATA_LENGTH - 8);
- data = content;
- return true;
- } else {
- buffer.release();
- return false;
- }
- }
-
- boolean isComplete() {
- return complete.get();
- }
-
- @Override
- public String toString() {
- return String.format("L%d-E%d", ledgerId, entryId);
- }
}
PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
@@ -325,11 +388,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
ensemble = getLedgerMetadata().getEnsemble(i);
nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i);
}
- LedgerEntryRequest entry = new LedgerEntryRequest(ensemble, lh.ledgerId, i);
+ LedgerEntryRequest entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
seq.add(entry);
i++;
- entry.sendNextRead();
+ entry.read();
} while (i <= endEntryId);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5c81acac/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index 2a6c71d..ee77d37 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -298,7 +298,7 @@ public class TestSpeculativeRead extends BaseTestCase {
// if we've already heard from all hosts,
// we only send the initial read
- req0 = op.new LedgerEntryRequest(ensemble, l.getId(), 0);
+ req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0);
assertTrue("Should have sent to first",
req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
assertNull("Should not have sent another",
@@ -306,7 +306,7 @@ public class TestSpeculativeRead extends BaseTestCase {
// if we have heard from some hosts, but not one we have sent to
// send again
- req2 = op.new LedgerEntryRequest(ensemble, l.getId(), 2);
+ req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2);
assertTrue("Should have sent to third",
req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
assertTrue("Should have sent to first",
@@ -314,7 +314,7 @@ public class TestSpeculativeRead extends BaseTestCase {
// if we have heard from some hosts, which includes one we sent to
// do not read again
- req4 = op.new LedgerEntryRequest(ensemble, l.getId(), 4);
+ req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4);
assertTrue("Should have sent to second",
req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
assertNull("Should not have sent another",