You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/29 21:57:24 UTC
[bookkeeper] branch master updated: ISSUE #207: change the HashSet
to BitSet to handle duplicated bookies on reading entries
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 75aca23 ISSUE #207: change the HashSet to BitSet to handle duplicated bookies on reading entries
75aca23 is described below
commit 75aca2397e43ec321a049aaec514c99cc9fad235
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jun 29 14:57:16 2017 -0700
ISSUE #207: change the HashSet to BitSet to handle duplicated bookies on reading entries
Descriptions of the changes in this PR:
Problem:
If there happens to have duplicated bookies in same ledger (it happened before when ensemble change still has problem), the read doesn't work as expected. It hang forever.
Solution:
Change the HashSet to BitSet to use bookieIndex rather than InetSocketAddress to track the heardfrom bookies set.
---
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the PR title is formatted like:
`<Issue #>: Description of pull request`
`e.g. Issue 123: Description ...`
- [x] Make sure tests pass via `mvn clean apache-rat:check install findbugs:check`.
- [x] Replace `<Issue #>` in the title with the actual Issue number, if there is one.
---
Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Matteo Merli <mm...@apache.org>
This closes #199 from sijie/client_changes/bitset, closes #207
---
.../apache/bookkeeper/client/PendingReadOp.java | 83 ++++++++++---------
.../apache/bookkeeper/client/TestSequenceRead.java | 94 ++++++++++++++++++++++
.../bookkeeper/client/TestSpeculativeRead.java | 20 ++---
3 files changed, 146 insertions(+), 51 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 306c5dd..0dd1f67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -62,6 +62,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
private ScheduledFuture<?> speculativeTask = null;
Queue<LedgerEntryRequest> seq;
Set<BookieSocketAddress> heardFromHosts;
+ BitSet heardFromHostsBitSet;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
@@ -101,6 +102,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
/**
* Complete the read request from <i>host</i>.
*
+ * @param bookieIndex
+ * bookie index
* @param host
* host that respond the read
* @param buffer
@@ -108,12 +111,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
* @return return true if we managed to complete the entry;
* otherwise return false if the read entry is not complete or it is already completed before
*/
- boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
+ boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer) {
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKDigestMatchException e) {
- logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+ logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
buffer.release();
return false;
}
@@ -153,6 +156,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
/**
* Log error <i>errMsg</i> and reattempt read from <i>host</i>.
*
+ * @param bookieIndex
+ * bookie index
* @param host
* host that just respond
* @param errMsg
@@ -160,7 +165,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
* @param rc
* read result code
*/
- synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
+ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
if (BKException.Code.OK == firstError ||
BKException.Code.NoSuchEntryException == firstError ||
BKException.Code.NoSuchLedgerExistsException == firstError) {
@@ -192,11 +197,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
* Send to next replica speculatively, if required and possible.
* This returns the host we may have sent to for unit testing.
*
- * @param heardFromHosts
+ * @param heardFromHostsBitSet
* the set of hosts that we already received responses.
* @return host we sent to if we sent. null otherwise.
*/
- abstract BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts);
+ abstract BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet);
/**
* Whether the read request completed.
@@ -220,6 +225,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
public String toString() {
return String.format("L%d-E%d", ledgerId, entryId);
}
+
}
class ParallelReadRequest extends LedgerEntryRequest {
@@ -236,7 +242,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
for (int bookieIndex : writeSet) {
BookieSocketAddress to = ensemble.get(bookieIndex);
try {
- sendReadTo(to, this);
+ sendReadTo(bookieIndex, to, this);
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry {} : ", this, ie);
Thread.currentThread().interrupt();
@@ -247,8 +253,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
@Override
- synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
- super.logErrorAndReattemptRead(host, errMsg, rc);
+ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
+ super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
--numPendings;
// if received all responses or this entry doesn't meet quorum write, complete the request.
if (numMissedEntryReads > maxMissedReadsAllowed || numPendings == 0) {
@@ -262,7 +268,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
@Override
- BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
+ BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
// no speculative read
return null;
}
@@ -282,11 +288,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
}
- private int getReplicaIndex(BookieSocketAddress host) {
- int bookieIndex = ensemble.indexOf(host);
- if (bookieIndex == -1) {
- return NOT_FOUND;
- }
+ private synchronized int getNextReplicaIndexToReadFrom() {
+ return nextReplicaIndexToReadFrom;
+ }
+
+ private int getReplicaIndex(int bookieIndex) {
return writeSet.indexOf(bookieIndex);
}
@@ -301,17 +307,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
return b;
}
- private BitSet getHeardFromBitSet(Set<BookieSocketAddress> heardFromHosts) {
- BitSet b = new BitSet(ensemble.size());
- for (BookieSocketAddress i : heardFromHosts) {
- int index = ensemble.indexOf(i);
- if (index != -1) {
- b.set(index);
- }
- }
- return b;
- }
-
private boolean readsOutstanding() {
return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
}
@@ -322,13 +317,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
* @return host we sent to if we sent. null otherwise.
*/
@Override
- synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
+ synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) {
if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
return null;
}
BitSet sentTo = getSentToBitSet();
- BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
sentTo.and(heardFrom);
// only send another read, if we have had no response at all (even for other entries)
@@ -367,7 +361,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
try {
BookieSocketAddress to = ensemble.get(bookieIndex);
- sendReadTo(to, this);
+ sendReadTo(bookieIndex, to, this);
sentReplicas.set(replica);
return to;
} catch (InterruptedException ie) {
@@ -379,10 +373,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
@Override
- synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
- super.logErrorAndReattemptRead(host, errMsg, rc);
+ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
+ super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
- int replica = getReplicaIndex(host);
+ int replica = getReplicaIndex(bookieIndex);
if (replica == NOT_FOUND) {
LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
return;
@@ -393,6 +387,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
sendNextRead();
}
}
+
}
PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
@@ -408,7 +403,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
- getLedgerMetadata().getAckQuorumSize();
speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
- heardFromHosts = new HashSet<BookieSocketAddress>();
+ heardFromHosts = new HashSet<>();
+ heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
readOpLogger = lh.bk.getReadOpLogger();
}
@@ -440,12 +436,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
int x = 0;
for (LedgerEntryRequest r : seq) {
if (!r.isComplete()) {
- if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
+ if (null == r.maybeSendSpeculativeRead(heardFromHostsBitSet)) {
// Subsequent speculative read will not materialize anyway
cancelSpeculativeTask(false);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Send speculative read for {}. Hosts heard are {}.", r, heardFromHosts);
+ LOG.debug("Send speculative read for {}. Hosts heard are {}.", r, heardFromHostsBitSet);
}
++x;
}
@@ -454,7 +450,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
if (x > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
- new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
+ new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHostsBitSet });
}
}
}
@@ -491,11 +487,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
private static class ReadContext implements ReadEntryCallbackCtx {
+ final int bookieIndex;
final BookieSocketAddress to;
final LedgerEntryRequest entry;
long lac = LedgerHandle.INVALID_ENTRY_ID;
- ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) {
+ ReadContext(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) {
+ this.bookieIndex = bookieIndex;
this.to = to;
this.entry = entry;
}
@@ -511,13 +509,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
}
}
- void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
+ void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
if (lh.throttler != null) {
lh.throttler.acquire();
}
lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
- this, new ReadContext(to, entry));
+ this, new ReadContext(bookieIndex, to, entry));
}
@Override
@@ -526,13 +524,14 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
final LedgerEntryRequest entry = rctx.entry;
if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
+ entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc);
return;
}
heardFromHosts.add(rctx.to);
+ heardFromHostsBitSet.set(rctx.bookieIndex, true);
- if (entry.complete(rctx.to, buffer)) {
+ if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
submitCallback(BKException.Code.OK);
}
@@ -564,8 +563,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
break;
}
}
- LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}",
- new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, firstUnread });
+ LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} : bitset = {}. First unread entry is {}",
+ new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, heardFromHostsBitSet, firstUnread });
readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
} else {
readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
new file mode 100644
index 0000000..5b035b7
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test reading an entry from replicas in sequence way.
+ */
+public class TestSequenceRead extends BookKeeperClusterTestCase {
+
+ static final Logger logger = LoggerFactory.getLogger(TestSequenceRead.class);
+
+ final DigestType digestType;
+ final byte[] passwd = "sequence-read".getBytes();
+
+ public TestSequenceRead() {
+ super(5);
+ this.digestType = DigestType.CRC32;
+ }
+
+ private LedgerHandle createLedgerWithDuplicatedBookies() throws Exception {
+ final LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, passwd);
+ // introduce duplicated bookies in an ensemble.
+ SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh.getLedgerMetadata().getEnsembles();
+ SortedMap<Long, ArrayList<BookieSocketAddress>> newEnsembles = new TreeMap<Long, ArrayList<BookieSocketAddress>>();
+ for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
+ ArrayList<BookieSocketAddress> newList = new ArrayList<BookieSocketAddress>(entry.getValue().size());
+ BookieSocketAddress firstBookie = entry.getValue().get(0);
+ for (BookieSocketAddress ignored : entry.getValue()) {
+ newList.add(firstBookie);
+ }
+ newEnsembles.put(entry.getKey(), newList);
+ }
+ lh.getLedgerMetadata().setEnsembles(newEnsembles);
+ // update the ledger metadata with duplicated bookies
+ final CountDownLatch latch = new CountDownLatch(1);
+ bkc.getLedgerManager().writeLedgerMetadata(lh.getId(), lh.getLedgerMetadata(), new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ if (BKException.Code.OK == rc) {
+ latch.countDown();
+ } else {
+ logger.error("Error on writing ledger metadata for ledger {} : ", lh.getId(), BKException.getMessage(rc));
+ }
+ }
+ });
+ latch.await();
+ logger.info("Update ledger metadata with duplicated bookies for ledger {}.", lh.getId());
+ return lh;
+ }
+
+ @Test(timeout = 60000)
+ public void testSequenceReadOnDuplicatedBookies() throws Exception {
+ final LedgerHandle lh = createLedgerWithDuplicatedBookies();
+
+ // should be able to open the ledger even it has duplicated bookies
+ final LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, passwd);
+ assertEquals(LedgerHandle.INVALID_ENTRY_ID, readLh.getLastAddConfirmed());
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index ee77d37..c0b8e49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,19 +18,20 @@ package org.apache.bookkeeper.client;
* under the License.
*
*/
+package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BaseTestCase;
+
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -286,10 +285,13 @@ public class TestSpeculativeRead extends BaseTestCase {
LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
ArrayList<BookieSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
- Set<BookieSocketAddress> allHosts = new HashSet<BookieSocketAddress>(ensemble);
- Set<BookieSocketAddress> noHost = new HashSet<BookieSocketAddress>();
- Set<BookieSocketAddress> secondHostOnly = new HashSet<BookieSocketAddress>();
- secondHostOnly.add(ensemble.get(1));
+ BitSet allHosts = new BitSet(ensemble.size());
+ for (int i = 0; i < ensemble.size(); i++) {
+ allHosts.set(i, true);
+ }
+ BitSet noHost = new BitSet(ensemble.size());
+ BitSet secondHostOnly = new BitSet(ensemble.size());
+ secondHostOnly.set(1, true);
PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
try {
LatchCallback latch0 = new LatchCallback();
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].