You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/03 10:55:02 UTC
svn commit: r1416408 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src:
main/java/org/apache/bookkeeper/client/ main/java/org/apache/bookkeeper/conf/
test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/
Author: ivank
Date: Mon Dec 3 09:55:01 2012
New Revision: 1416408
URL: http://svn.apache.org/viewvc?rev=1416408&view=rev
Log:
[REVERT] BOOKKEEPER-336 bookie readEntries is taking more time if the ensemble has failed bookie(s) Basic speculative functionality in place
Accidently committed this change, before approval. Reverting (ivank)
Removed:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Mon Dec 3 09:55:01 2012
@@ -21,19 +21,11 @@ package org.apache.bookkeeper.client;
*
*/
import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.BitSet;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -53,10 +45,7 @@ import org.jboss.netty.buffer.ChannelBuf
class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
- final int speculativeReadTimeout;
- Timer speculativeReadTimer;
Queue<LedgerEntryRequest> seq;
- Set<InetSocketAddress> heardFromHosts;
ReadCallback cb;
Object ctx;
LedgerHandle lh;
@@ -64,135 +53,59 @@ class PendingReadOp implements Enumerati
long startEntryId;
long endEntryId;
- class LedgerEntryRequest extends LedgerEntry {
+ private class LedgerEntryRequest extends LedgerEntry {
int nextReplicaIndexToReadFrom = 0;
AtomicBoolean complete = new AtomicBoolean(false);
int firstError = BKException.Code.OK;
final ArrayList<InetSocketAddress> ensemble;
- final List<Integer> writeSet;
- final BitSet sentReplicas;
- final BitSet erroredReplicas;
LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
- this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
- this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
- }
-
- private int getReplicaIndex(InetSocketAddress host) {
- int bookieIndex = ensemble.indexOf(host);
- if (bookieIndex == -1) {
- return -1;
- }
- return writeSet.indexOf(bookieIndex);
- }
-
- private BitSet getSentToBitSet() {
- BitSet b = new BitSet(ensemble.size());
-
- for (int i = 0; i < sentReplicas.length(); i++) {
- if (sentReplicas.get(i)) {
- b.set(writeSet.get(i));
- }
- }
- return b;
- }
-
- private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
- BitSet b = new BitSet(ensemble.size());
- for (InetSocketAddress i : heardFromHosts) {
- int index = ensemble.indexOf(i);
- if (index != -1) {
- b.set(index);
- }
- }
- return b;
- }
-
- private boolean readsOutstanding() {
- return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
- }
-
- /**
- * Send to next replica speculatively, if required and possible.
- * This returns the host we may have sent to for unit testing.
- * @return host we sent to if we sent. null otherwise.
- */
- synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
- if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
- return null;
- }
-
- BitSet sentTo = getSentToBitSet();
- BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
- sentTo.and(heardFrom);
-
- // only send another read, if we have had no response at all (even for other entries)
- // from any of the other bookies we have sent the request to
- if (sentTo.cardinality() == 0) {
- return sendNextRead();
- } else {
- return null;
- }
}
- synchronized InetSocketAddress sendNextRead() {
+ void sendNextRead() {
if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
// we are done, the read has failed from all replicas, just fail the
// read
submitCallback(firstError);
- return null;
+ return;
}
- int replica = nextReplicaIndexToReadFrom;
int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
- InetSocketAddress to = ensemble.get(bookieIndex);
- sendReadTo(to, this);
- sentReplicas.set(replica);
- return to;
+ sendReadTo(ensemble.get(bookieIndex), this);
} catch (InterruptedException ie) {
LOG.error("Interrupted reading entry " + this, ie);
Thread.currentThread().interrupt();
submitCallback(BKException.Code.ReadException);
- return null;
}
}
- synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
+ void logErrorAndReattemptRead(String errMsg, int rc) {
if (firstError == BKException.Code.OK) {
firstError = rc;
}
+ int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1);
LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
- + host);
+ + ensemble.get(bookieIndex));
- int replica = getReplicaIndex(host);
- if (replica == -1) {
- LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
- return;
- }
- erroredReplicas.set(replica);
-
- if (!readsOutstanding()) {
- sendNextRead();
- }
+ sendNextRead();
}
// return true if we managed to complete the entry
- boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
+ boolean complete(final ChannelBuffer buffer) {
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
} catch (BKDigestMatchException e) {
- logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+ logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException);
return false;
}
@@ -220,40 +133,23 @@ class PendingReadOp implements Enumerati
}
PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
- seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
+
+ seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - startEntryId));
this.cb = cb;
this.ctx = ctx;
this.lh = lh;
this.startEntryId = startEntryId;
this.endEntryId = endEntryId;
numPendingEntries = endEntryId - startEntryId + 1;
- speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
- if (speculativeReadTimeout > 0) {
- speculativeReadTimer = new Timer("SpeculativeRead-L"+lh.getId()+"-S"+startEntryId+"-E"+endEntryId);
- } else {
- speculativeReadTimer = null;
- }
- heardFromHosts = new HashSet<InetSocketAddress>();
}
public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
ArrayList<InetSocketAddress> ensemble = null;
-
- if (speculativeReadTimer != null) {
- speculativeReadTimer.schedule(new TimerTask() {
- public void run() {
- for (LedgerEntryRequest r : seq) {
- if (!r.isComplete()) {
- r.maybeSendSpeculativeRead(heardFromHosts);
- }
- }
- }
- }, speculativeReadTimeout, speculativeReadTimeout);
- }
-
do {
+ LOG.debug("Acquiring lock: {}", i);
+
if (i == nextEnsembleChange) {
ensemble = lh.metadata.getEnsemble(i);
nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
@@ -266,27 +162,16 @@ class PendingReadOp implements Enumerati
} while (i <= endEntryId);
}
- private static class ReadContext {
- final InetSocketAddress to;
- final LedgerEntryRequest entry;
-
- ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
- this.to = to;
- this.entry = entry;
- }
- }
-
void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
lh.opCounterSem.acquire();
lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
- this, new ReadContext(to, entry));
+ this, entry);
}
@Override
public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
- final ReadContext rctx = (ReadContext)ctx;
- final LedgerEntryRequest entry = rctx.entry;
+ final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
lh.opCounterSem.release();
@@ -305,13 +190,11 @@ class PendingReadOp implements Enumerati
}
if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
+ entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc);
return;
}
- heardFromHosts.add(rctx.to);
-
- if (entry.complete(rctx.to, buffer)) {
+ if (entry.complete(buffer)) {
numPendingEntries--;
}
@@ -324,9 +207,6 @@ class PendingReadOp implements Enumerati
}
private void submitCallback(int code) {
- if (speculativeReadTimer != null) {
- speculativeReadTimer.cancel();
- }
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
}
public boolean hasMoreElements() {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Mon Dec 3 09:55:01 2012
@@ -43,7 +43,6 @@ public class ClientConfiguration extends
// NIO Parameters
protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
protected final static String READ_TIMEOUT = "readTimeout";
- protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
// Number Woker Threads
protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
@@ -276,39 +275,4 @@ public class ClientConfiguration extends
setProperty(NUM_WORKER_THREADS, numThreads);
return this;
}
-
- /**
- * Get the period of time after which a speculative entry read should be triggered.
- * A speculative entry read is sent to the next replica bookie before
- * an error or response has been received for the previous entry read request.
- *
- * A speculative entry read is only sent if we have not heard from the current
- * replica bookie during the entire read operation which may comprise of many entries.
- *
- * Speculative reads allow the client to avoid having to wait for the connect timeout
- * in the case that a bookie has failed. It induces higher load on the network and on
- * bookies. This should be taken into account before changing this configuration value.
- *
- * @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries
- * @return the speculative read timeout in milliseconds. Default 2000.
- */
- public int getSpeculativeReadTimeout() {
- return getInt(SPECULATIVE_READ_TIMEOUT, 2000);
- }
-
- /**
- * Set the speculative read timeout. A lower timeout will reduce read latency in the
- * case of a failed bookie, while increasing the load on bookies and the network.
- *
- * The default is 2000 milliseconds. A value of 0 will disable speculative reads
- * completely.
- *
- * @see #getSpeculativeReadTimeout()
- * @param timeout the timeout value, in milliseconds
- * @return client configuration
- */
- public ClientConfiguration setSpeculativeReadTimeout(int timeout) {
- setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
- return this;
- }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Mon Dec 3 09:55:01 2012
@@ -234,7 +234,7 @@ public abstract class BookKeeperClusterT
public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
throws InterruptedException, IOException {
final CountDownLatch l = new CountDownLatch(1);
- final String name = "NIOServerFactory-" + addr.getPort();
+ final String name = "BookieJournal-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
@@ -271,7 +271,7 @@ public abstract class BookKeeperClusterT
*/
public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
throws InterruptedException, IOException {
- final String name = "NIOServerFactory-" + addr.getPort();
+ final String name = "BookieJournal-" + addr.getPort();
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {