You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/03 10:46:06 UTC
svn commit: r1416393 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src:
main/java/org/apache/bookkeeper/client/ main/java/org/apache/bookkeeper/conf/
test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/
Author: ivank
Date: Mon Dec 3 09:46:05 2012
New Revision: 1416393
URL: http://svn.apache.org/viewvc?rev=1416393&view=rev
Log:
BOOKKEEPER-336 bookie readEntries is taking more time if the ensemble has failed bookie(s) Basic speculative functionality in place
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1416393&r1=1416392&r2=1416393&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Mon Dec 3 09:46:05 2012
@@ -21,11 +21,19 @@ package org.apache.bookkeeper.client;
*
*/
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -45,7 +53,10 @@ import org.jboss.netty.buffer.ChannelBuf
class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
+ final int speculativeReadTimeout;
+ Timer speculativeReadTimer;
Queue<LedgerEntryRequest> seq;
+ Set<InetSocketAddress> heardFromHosts;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
@@ -53,59 +64,135 @@ class PendingReadOp implements Enumerati
long startEntryId;
long endEntryId;
- private class LedgerEntryRequest extends LedgerEntry {
+ class LedgerEntryRequest extends LedgerEntry {
int nextReplicaIndexToReadFrom = 0;
AtomicBoolean complete = new AtomicBoolean(false);
int firstError = BKException.Code.OK;
final ArrayList<InetSocketAddress> ensemble;
+ final List<Integer> writeSet;
+ final BitSet sentReplicas;
+ final BitSet erroredReplicas;
LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
+ this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ }
+
+ private int getReplicaIndex(InetSocketAddress host) {
+ int bookieIndex = ensemble.indexOf(host);
+ if (bookieIndex == -1) {
+ return -1;
+ }
+ return writeSet.indexOf(bookieIndex);
+ }
+
+ private BitSet getSentToBitSet() {
+ BitSet b = new BitSet(ensemble.size());
+
+ for (int i = 0; i < sentReplicas.length(); i++) {
+ if (sentReplicas.get(i)) {
+ b.set(writeSet.get(i));
+ }
+ }
+ return b;
+ }
+
+ private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
+ BitSet b = new BitSet(ensemble.size());
+ for (InetSocketAddress i : heardFromHosts) {
+ int index = ensemble.indexOf(i);
+ if (index != -1) {
+ b.set(index);
+ }
+ }
+ return b;
+ }
+
+ private boolean readsOutstanding() {
+ return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
+ }
+
+ /**
+ * Send to next replica speculatively, if required and possible.
+ * This returns the host we may have sent to for unit testing.
+ * @return host we sent to if we sent. null otherwise.
+ */
+ synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
+ if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
+ return null;
+ }
+
+ BitSet sentTo = getSentToBitSet();
+ BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
+ sentTo.and(heardFrom);
+
+ // only send another read, if we have had no response at all (even for other entries)
+ // from any of the other bookies we have sent the request to
+ if (sentTo.cardinality() == 0) {
+ return sendNextRead();
+ } else {
+ return null;
+ }
}
- void sendNextRead() {
+ synchronized InetSocketAddress sendNextRead() {
if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
submitCallback(firstError);
- return;
+ return null;
}
+ int replica = nextReplicaIndexToReadFrom;
int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
- sendReadTo(ensemble.get(bookieIndex), this);
+ InetSocketAddress to = ensemble.get(bookieIndex);
+ sendReadTo(to, this);
+ sentReplicas.set(replica);
+ return to;
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry " + this, ie);
Thread.currentThread().interrupt();
submitCallback(BKException.Code.ReadException);
+ return null;
}
}
- void logErrorAndReattemptRead(String errMsg, int rc) {
+ synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
if (firstError == BKException.Code.OK) {
firstError = rc;
}
- int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1);
LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
- + ensemble.get(bookieIndex));
+ + host);
- sendNextRead();
+ int replica = getReplicaIndex(host);
+ if (replica == -1) {
+ LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
+ return;
+ }
+ erroredReplicas.set(replica);
+
+ if (!readsOutstanding()) {
+ sendNextRead();
+ }
}
// return true if we managed to complete the entry
- boolean complete(final ChannelBuffer buffer) {
+ boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKDigestMatchException e) {
- logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException);
+ logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
return false;
}
@@ -133,23 +220,40 @@ class PendingReadOp implements Enumerati
}
PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
-
- seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - startEntryId));
+ seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
this.cb = cb;
this.ctx = ctx;
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
numPendingEntries = endEntryId - startEntryId + 1;
+ speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
+ if (speculativeReadTimeout > 0) {
+ speculativeReadTimer = new Timer("SpeculativeRead-L"+lh.getId()+"-S"+startEntryId+"-E"+endEntryId);
+ } else {
+ speculativeReadTimer = null;
+ }
+ heardFromHosts = new HashSet<InetSocketAddress>();
}
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
ArrayList<InetSocketAddress> ensemble = null;
- do {
- LOG.debug("Acquiring lock: {}", i);
+ if (speculativeReadTimer != null) {
+ speculativeReadTimer.schedule(new TimerTask() {
+ public void run() {
+ for (LedgerEntryRequest r : seq) {
+ if (!r.isComplete()) {
+ r.maybeSendSpeculativeRead(heardFromHosts);
+ }
+ }
+ }
+ }, speculativeReadTimeout, speculativeReadTimeout);
+ }
+
+ do {
if (i == nextEnsembleChange) {
ensemble = lh.metadata.getEnsemble(i);
nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
@@ -162,16 +266,27 @@ class PendingReadOp implements Enumerati
} while (i <= endEntryId);
}
+ private static class ReadContext {
+ final InetSocketAddress to;
+ final LedgerEntryRequest entry;
+
+ ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
+ this.to = to;
+ this.entry = entry;
+ }
+ }
+
void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
lh.opCounterSem.acquire();
lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
- this, entry);
+ this, new ReadContext(to, entry));
}
@Override
public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
- final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
+ final ReadContext rctx = (ReadContext)ctx;
+ final LedgerEntryRequest entry = rctx.entry;
lh.opCounterSem.release();
@@ -190,11 +305,13 @@ class PendingReadOp implements Enumerati
}
if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc);
+ entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
return;
}
- if (entry.complete(buffer)) {
+ heardFromHosts.add(rctx.to);
+
+ if (entry.complete(rctx.to, buffer)) {
numPendingEntries--;
}
@@ -207,6 +324,9 @@ class PendingReadOp implements Enumerati
}
private void submitCallback(int code) {
+ if (speculativeReadTimer != null) {
+ speculativeReadTimer.cancel();
+ }
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
}
public boolean hasMoreElements() {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1416393&r1=1416392&r2=1416393&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Mon Dec 3 09:46:05 2012
@@ -43,6 +43,7 @@ public class ClientConfiguration extends
// NIO Parameters
protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
protected final static String READ_TIMEOUT = "readTimeout";
+ protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
// Number Woker Threads
protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
@@ -275,4 +276,39 @@ public class ClientConfiguration extends
setProperty(NUM_WORKER_THREADS, numThreads);
return this;
}
+
+ /**
+ * Get the period of time after which a speculative entry read should be triggered.
+ * A speculative entry read is sent to the next replica bookie before
+ * an error or response has been received for the previous entry read request.
+ *
+ * A speculative entry read is only sent if we have not heard from the current
+ * replica bookie during the entire read operation which may comprise of many entries.
+ *
+ * Speculative reads allow the client to avoid having to wait for the connect timeout
+ * in the case that a bookie has failed. It induces higher load on the network and on
+ * bookies. This should be taken into account before changing this configuration value.
+ *
+ * @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries
+ * @return the speculative read timeout in milliseconds. Default 2000.
+ */
+ public int getSpeculativeReadTimeout() {
+ return getInt(SPECULATIVE_READ_TIMEOUT, 2000);
+ }
+
+ /**
+ * Set the speculative read timeout. A lower timeout will reduce read latency in the
+ * case of a failed bookie, while increasing the load on bookies and the network.
+ *
+ * The default is 2000 milliseconds. A value of 0 will disable speculative reads
+ * completely.
+ *
+ * @see #getSpeculativeReadTimeout()
+ * @param timeout the timeout value, in milliseconds
+ * @return client configuration
+ */
+ public ClientConfiguration setSpeculativeReadTimeout(int timeout) {
+ setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
+ return this;
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java?rev=1416393&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java Mon Dec 3 09:46:05 2012
@@ -0,0 +1,329 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.junit.*;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BaseTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests ledger fencing;
+ *
+ */
+public class TestSpeculativeRead extends BaseTestCase {
+ static Logger LOG = LoggerFactory.getLogger(TestSpeculativeRead.class);
+
+ DigestType digestType;
+ byte[] passwd = "specPW".getBytes();
+
+ public TestSpeculativeRead(DigestType digestType) {
+ super(10);
+ this.digestType = digestType;
+ }
+
+ long getLedgerToRead(int ensemble, int quorum) throws Exception {
+ byte[] data = "Data for test".getBytes();
+ LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd);
+ for (int i = 0; i < 10; i++) {
+ l.addEntry(data);
+ }
+ l.close();
+
+ return l.getId();
+ }
+
+ BookKeeper createClient(int specTimeout) throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setSpeculativeReadTimeout(specTimeout)
+ .setReadTimeout(30000);
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ return new BookKeeper(conf);
+ }
+
+ class LatchCallback implements ReadCallback {
+ CountDownLatch l = new CountDownLatch(1);
+ boolean success = false;
+ long startMillis = System.currentTimeMillis();
+ long endMillis = Long.MAX_VALUE;
+
+ public void readComplete(int rc,
+ LedgerHandle lh,
+ Enumeration<LedgerEntry> seq,
+ Object ctx) {
+ endMillis = System.currentTimeMillis();
+ LOG.debug("Got response {} {}", rc, getDuration());
+ success = rc == BKException.Code.OK;
+ l.countDown();
+ }
+
+ long getDuration() {
+ return endMillis - startMillis;
+ }
+
+ void expectSuccess(int milliseconds) throws Exception {
+ assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+ assertTrue(success);
+ }
+
+ void expectFail(int milliseconds) throws Exception {
+ assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+ assertFalse(success);
+ }
+
+ void expectTimeout(int milliseconds) throws Exception {
+ assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ /**
+ * Test basic speculative functionallity.
+ * - Create 2 clients with read timeout disabled, one with spec
+ * read enabled, the other not.
+ * - create ledger
+ * - sleep second bookie in ensemble
+ * - read first entry, both should find on first bookie.
+ * - read second bookie, spec client should find on bookie three,
+ * non spec client should hang.
+ */
+ @Test
+ public void testSpeculativeRead() throws Exception {
+ long id = getLedgerToRead(3,2);
+ BookKeeper bknospec = createClient(0); // disabled
+ BookKeeper bkspec = createClient(2000);
+
+ LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd);
+ LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep second bookie
+ CountDownLatch sleepLatch = new CountDownLatch(1);
+ InetSocketAddress second = lnospec.getLedgerMetadata().getEnsembles().get(0L).get(1);
+ sleepBookie(second, sleepLatch);
+
+ try {
+ // read first entry, both go to first bookie, should be fine
+ LatchCallback nospeccb = new LatchCallback();
+ LatchCallback speccb = new LatchCallback();
+ lnospec.asyncReadEntries(0, 0, nospeccb, null);
+ lspec.asyncReadEntries(0, 0, speccb, null);
+ nospeccb.expectSuccess(2000);
+ speccb.expectSuccess(2000);
+
+ // read second entry, both look for second book, spec read client
+ // tries third bookie, nonspec client hangs as read timeout is very long.
+ nospeccb = new LatchCallback();
+ speccb = new LatchCallback();
+ lnospec.asyncReadEntries(1, 1, nospeccb, null);
+ lspec.asyncReadEntries(1, 1, speccb, null);
+ speccb.expectSuccess(4000);
+ nospeccb.expectTimeout(4000);
+ } finally {
+ sleepLatch.countDown();
+ lspec.close();
+ lnospec.close();
+ bkspec.close();
+ bknospec.close();
+ }
+ }
+
+ /**
+ * Test that if more than one replica is down, we can still read, as long as the quorum
+ * size is larger than the number of down replicas.
+ */
+ @Test
+ public void testSpeculativeReadMultipleReplicasDown() throws Exception {
+ long id = getLedgerToRead(5,5);
+ int timeout = 5000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep bookie 1, 2 & 4
+ CountDownLatch sleepLatch = new CountDownLatch(1);
+ sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch);
+ sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(2), sleepLatch);
+ sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(4), sleepLatch);
+
+ try {
+ // read first entry, should complete faster than timeout
+ // as bookie 0 has the entry
+ LatchCallback latch0 = new LatchCallback();
+ l.asyncReadEntries(0, 0, latch0, null);
+ latch0.expectSuccess(timeout/2);
+
+ // second should have to hit two timeouts (bookie 1 & 2)
+ // bookie 3 has the entry
+ LatchCallback latch1 = new LatchCallback();
+ l.asyncReadEntries(1, 1, latch1, null);
+ latch1.expectTimeout(timeout);
+ latch1.expectSuccess(timeout*2);
+ assertTrue("should have taken longer than two timeouts, but less than 3",
+ latch1.getDuration() > timeout*2
+ && latch1.getDuration() < timeout*3);
+
+ // third should have to hit one timeouts (bookie 2)
+ // bookie 3 has the entry
+ LatchCallback latch2 = new LatchCallback();
+ l.asyncReadEntries(2, 2, latch2, null);
+ latch2.expectTimeout(timeout/2);
+ latch2.expectSuccess(timeout);
+ assertTrue("should have taken longer than one timeout, but less than 2",
+ latch2.getDuration() > timeout
+ && latch2.getDuration() < timeout*2);
+
+ // fourth should have no timeout
+ // bookie 3 has the entry
+ LatchCallback latch3 = new LatchCallback();
+ l.asyncReadEntries(3, 3, latch3, null);
+ latch3.expectSuccess(timeout/2);
+
+ // fifth should hit one timeout, (bookie 4)
+ // bookie 0 has the entry
+ LatchCallback latch4 = new LatchCallback();
+ l.asyncReadEntries(4, 4, latch4, null);
+ latch4.expectTimeout(timeout/2);
+ latch4.expectSuccess(timeout);
+ assertTrue("should have taken longer than one timeout, but less than 2",
+ latch4.getDuration() > timeout
+ && latch4.getDuration() < timeout*2);
+
+ } finally {
+ sleepLatch.countDown();
+ l.close();
+ bkspec.close();
+ }
+ }
+
+ /**
+ * Test that if after a speculative read is kicked off, the original read completes
+ * nothing bad happens.
+ */
+ @Test
+ public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
+ long id = getLedgerToRead(2,2);
+ int timeout = 1000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ // sleep bookies
+ CountDownLatch sleepLatch0 = new CountDownLatch(1);
+ CountDownLatch sleepLatch1 = new CountDownLatch(1);
+ sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(0), sleepLatch0);
+ sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch1);
+
+ try {
+ // read goes to first bookie, spec read timeout occurs,
+ // goes to second
+ LatchCallback latch0 = new LatchCallback();
+ l.asyncReadEntries(0, 0, latch0, null);
+ latch0.expectTimeout(timeout);
+
+ // wake up first bookie
+ sleepLatch0.countDown();
+ latch0.expectSuccess(timeout/2);
+
+ sleepLatch1.countDown();
+
+ // check we can read next entry without issue
+ LatchCallback latch1 = new LatchCallback();
+ l.asyncReadEntries(1, 1, latch1, null);
+ latch1.expectSuccess(timeout/2);
+
+ } finally {
+ sleepLatch0.countDown();
+ sleepLatch1.countDown();
+ l.close();
+ bkspec.close();
+ }
+ }
+
+ /**
+ * Unit test for the speculative read scheduling method
+ */
+ @Test
+ public void testSpeculativeReadScheduling() throws Exception {
+ long id = getLedgerToRead(3,2);
+ int timeout = 1000;
+ BookKeeper bkspec = createClient(timeout);
+
+ LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+ ArrayList<InetSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
+ Set<InetSocketAddress> allHosts = new HashSet(ensemble);
+ Set<InetSocketAddress> noHost = new HashSet();
+ Set<InetSocketAddress> secondHostOnly = new HashSet();
+ secondHostOnly.add(ensemble.get(1));
+ try {
+ LatchCallback latch0 = new LatchCallback();
+ PendingReadOp op = new PendingReadOp(l, 0, 5, latch0, null);
+
+ // if we've already heard from all hosts,
+ // we only send the initial read
+ PendingReadOp.LedgerEntryRequest req0
+ = op.new LedgerEntryRequest(ensemble, l.getId(), 0);
+ assertTrue("Should have sent to first",
+ req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
+ assertNull("Should not have sent another",
+ req0.maybeSendSpeculativeRead(allHosts));
+
+ // if we have heard from some hosts, but not one we have sent to
+ // send again
+ PendingReadOp.LedgerEntryRequest req2
+ = op.new LedgerEntryRequest(ensemble, l.getId(), 2);
+ assertTrue("Should have sent to third",
+ req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
+ assertTrue("Should have sent to first",
+ req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0)));
+
+ // if we have heard from some hosts, which includes one we sent to
+ // do not read again
+ PendingReadOp.LedgerEntryRequest req4
+ = op.new LedgerEntryRequest(ensemble, l.getId(), 4);
+ assertTrue("Should have sent to second",
+ req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
+ assertNull("Should not have sent another",
+ req4.maybeSendSpeculativeRead(secondHostOnly));
+ } finally {
+ // wait for all ops to complete
+ l.opCounterSem.acquire(bkspec.getConf().getThrottleValue());
+
+ l.close();
+ bkspec.close();
+ }
+ }
+}
\ No newline at end of file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1416393&r1=1416392&r2=1416393&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Mon Dec 3 09:46:05 2012
@@ -234,7 +234,7 @@ public abstract class BookKeeperClusterT
public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
throws InterruptedException, IOException {
final CountDownLatch l = new CountDownLatch(1);
- final String name = "BookieJournal-" + addr.getPort();
+ final String name = "NIOServerFactory-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
@@ -271,7 +271,7 @@ public abstract class BookKeeperClusterT
*/
public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
throws InterruptedException, IOException {
- final String name = "BookieJournal-" + addr.getPort();
+ final String name = "NIOServerFactory-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {