You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2010/09/17 18:58:07 UTC
svn commit: r998200 - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/
Author: breed
Date: Fri Sep 17 16:58:07 2010
New Revision: 998200
URL: http://svn.apache.org/viewvc?rev=998200&view=rev
Log:
ZOOKEEPER-831. BookKeeper: Throttling improved for reads
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Sep 17 16:58:07 2010
@@ -96,6 +96,8 @@ BUGFIXES:
ZOOKEEPER-870. Zookeeper trunk build broken. (mahadev via phunt)
+ ZOOKEEPER-831. BookKeeper: Throttling improved for reads (breed via fpj)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Sep 17 16:58:07 2010
@@ -67,7 +67,7 @@ public class LedgerHandle implements Rea
private Integer throttling = 5000;
final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
-
+
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password)
throws GeneralSecurityException, NumberFormatException {
@@ -149,6 +149,15 @@ public class LedgerHandle implements Rea
}
/**
+ * Return total number of available slots.
+ *
+ * @return int available slots
+ */
+ Semaphore getAvailablePermits(){
+ return this.opCounterSem;
+ }
+
+ /**
* Get the Distribution Schedule
*
* @return DistributionSchedule for the LedgerHandle
@@ -277,7 +286,6 @@ public class LedgerHandle implements Rea
}
new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
- opCounterSem.acquire();
}
/**
@@ -310,26 +318,32 @@ public class LedgerHandle implements Rea
*/
public void asyncAddEntry(final byte[] data, final AddCallback cb,
final Object ctx) throws InterruptedException {
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- if (metadata.isClosed()) {
- LOG.warn("Attempt to add to closed ledger: " + ledgerId);
- cb.addComplete(BKException.Code.LedgerClosedException,
- LedgerHandle.this, -1, ctx);
- return;
- }
-
- long entryId = ++lastAddPushed;
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
- pendingAddOps.add(op);
- ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
- entryId, lastAddConfirmed, data);
- op.initiate(toSend);
-
- }
- });
opCounterSem.acquire();
+
+ try{
+ bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ if (metadata.isClosed()) {
+ LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+ LedgerHandle.this.opCounterSem.release();
+ cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandle.this, -1, ctx);
+ return;
+ }
+
+ long entryId = ++lastAddPushed;
+ PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+ pendingAddOps.add(op);
+ ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+ entryId, lastAddConfirmed, data);
+ op.initiate(toSend);
+ }
+ });
+ } catch (RuntimeException e) {
+ opCounterSem.release();
+ throw e;
+ }
}
// close the ledger and send fails to all the adds in the pipeline
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java Fri Sep 17 16:58:07 2010
@@ -64,12 +64,18 @@ class PendingReadOp implements Enumerati
numPendingReads = endEntryId - startEntryId + 1;
}
- public void initiate() {
+ public void initiate() throws InterruptedException {
long nextEnsembleChange = startEntryId, i = startEntryId;
ArrayList<InetSocketAddress> ensemble = null;
do {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Acquiring lock: " + i);
+ }
+
+ lh.opCounterSem.acquire();
+
if (i == nextEnsembleChange) {
ensemble = lh.metadata.getEnsemble(i);
nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
@@ -80,7 +86,6 @@ class PendingReadOp implements Enumerati
sendRead(ensemble, entry, BKException.Code.ReadException);
} while (i <= endEntryId);
-
}
void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
@@ -114,7 +119,6 @@ class PendingReadOp implements Enumerati
return;
}
- numPendingReads--;
ChannelBufferInputStream is;
try {
is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
@@ -125,15 +129,23 @@ class PendingReadOp implements Enumerati
entry.entryDataStream = is;
+ numPendingReads--;
if (numPendingReads == 0) {
submitCallback(BKException.Code.OK);
}
-
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Releasing lock: " + entryId);
+ }
+
+ lh.opCounterSem.release();
+
+ if(numPendingReads < 0)
+ LOG.error("Read too many values");
}
private void submitCallback(int code){
cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
- lh.opCounterSem.release();
}
public boolean hasMoreElements() {
return !seq.isEmpty();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=998200&r1=998199&r2=998200&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Fri Sep 17 16:58:07 2010
@@ -23,12 +23,17 @@ package org.apache.bookkeeper.test;
import java.io.File;
import java.io.IOException;
+import java.lang.NoSuchFieldException;
+import java.lang.IllegalAccessException;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Semaphore;
+
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
@@ -83,7 +88,7 @@ public class BookieReadWriteTest extends
Set<Object> syncObjs;
class SyncObj {
- int counter;
+ volatile int counter;
boolean value;
public SyncObj() {
@@ -237,19 +242,61 @@ public class BookieReadWriteTest extends
}
}
+ class ThrottleTestCallback implements ReadCallback {
+ int throttle;
+
+ ThrottleTestCallback(int threshold){
+ this.throttle = threshold;
+ }
+
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx){
+ if(rc != BKException.Code.OK){
+ fail("Return code is not OK: " + rc);
+ }
+
+ ls = seq;
+ synchronized(sync){
+ sync.counter += throttle;
+ sync.notify();
+ }
+ LOG.info("Current counter: " + sync.counter);
+ }
+ }
+
+ /**
+ * Method for obtaining the available permits of a ledger handle
+ * using reflection to avoid adding a new public method to the
+ * class.
+ *
+ * @param lh
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ int getAvailablePermits(LedgerHandle lh) throws
+ NoSuchFieldException, IllegalAccessException
+ {
+ Field field = LedgerHandle.class.getDeclaredField("opCounterSem");
+ field.setAccessible(true);
+ return ((Semaphore)field.get(lh)).availablePermits();
+ }
+
@Test
- public void testReadWriteAsyncSingleClientThrottle() throws IOException {
+ public void testReadWriteAsyncSingleClientThrottle() throws
+ IOException, NoSuchFieldException, IllegalAccessException {
try {
+
+ Integer throttle = 100;
+ ThrottleTestCallback tcb = new ThrottleTestCallback(throttle);
// Create a BookKeeper client and a ledger
- System.setProperty("throttle", "1000");
+ System.setProperty("throttle", throttle.toString());
bkc = new BookKeeper("127.0.0.1");
lh = bkc.createLedger(digestType, ledgerPassword);
// bkc.initMessageDigest("SHA1");
ledgerId = lh.getId();
LOG.info("Ledger ID: " + lh.getId());
- numEntriesToWrite = 20000;
- for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
+ numEntriesToWrite = 8000;
+ for (int i = 0; i < (numEntriesToWrite - 2000); i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
@@ -257,10 +304,15 @@ public class BookieReadWriteTest extends
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
+ /*
+ * Check that the difference is no larger than the throttling threshold
+ */
+ int testValue = getAvailablePermits(lh);
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < 2000; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
@@ -268,6 +320,12 @@ public class BookieReadWriteTest extends
entries.add(entry.array());
entriesSize.add(entry.array().length);
lh.asyncAddEntry(entry.array(), this, sync);
+
+ /*
+ * Check that the difference is no larger than the throttling threshold
+ */
+ int testValue = getAvailablePermits(lh);
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
}
// wait for all entries to be acknowledged
@@ -290,35 +348,22 @@ public class BookieReadWriteTest extends
assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
// read entries
- lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
-
+ sync.counter = 0;
+ for (int i = 0; i < numEntriesToWrite; i+=throttle) {
+ lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync);
+ int testValue = getAvailablePermits(lh);
+ assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
+ }
+
synchronized (sync) {
- while (sync.value == false) {
+ while (sync.counter < numEntriesToWrite) {
+ LOG.info("Entries counter = " + sync.counter);
sync.wait();
}
}
LOG.debug("*** READ COMPLETE ***");
- // at this point, LedgerSequence ls is filled with the returned
- // values
- int i = 0;
- while (ls.hasMoreElements()) {
- ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
- Integer origEntry = origbb.getInt();
- byte[] entry = ls.nextElement().getEntry();
- ByteBuffer result = ByteBuffer.wrap(entry);
- LOG.debug("Length of result: " + result.capacity());
- LOG.debug("Original entry: " + origEntry);
-
- Integer retrEntry = result.getInt();
- LOG.debug("Retrieved entry: " + retrEntry);
- assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
- assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
- i++;
- }
- assertTrue("Checking number of read entries", i == numEntriesToWrite);
-
lh.close();
} catch (KeeperException e) {
LOG.error("Test failed", e);
@@ -565,7 +610,10 @@ public class BookieReadWriteTest extends
}
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
+
SyncObj x = (SyncObj) ctx;
+
synchronized (x) {
x.counter++;
x.notify();
@@ -573,12 +621,14 @@ public class BookieReadWriteTest extends
}
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
+
ls = seq;
+
synchronized (sync) {
sync.value = true;
sync.notify();
}
-
}
@Before