You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/08/11 20:36:53 UTC
svn commit: r1156723 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/
bookkeeper-server/src/test/java/org/apache/bookkeeper/test/
Author: fpj
Date: Thu Aug 11 18:36:52 2011
New Revision: 1156723
URL: http://svn.apache.org/viewvc?rev=1156723&view=rev
Log:
BOOKKEEPER-33: Add length and offset parameter to addEntry (ivank via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Aug 11 18:36:52 2011
@@ -26,3 +26,5 @@ BUGFIXES:
BOOKKEEPER-29: BookieRecoveryTest fails intermittently (fpj via ivank)
+ BOOKKEEPER-33: Add length and offset parameter to addEntry (ivank via fpj)
+
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java Thu Aug 11 18:36:52 2011
@@ -79,7 +79,7 @@ public abstract class DigestManager {
* @return
*/
- public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data) {
+ public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) {
byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength];
ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
@@ -90,7 +90,7 @@ public abstract class DigestManager {
buffer.flip();
update(buffer.array(), 0, METADATA_LENGTH);
- update(data);
+ update(data, doffset, dlength);
byte[] digest = getValueAndReset();
buffer.limit(buffer.capacity());
@@ -98,7 +98,7 @@ public abstract class DigestManager {
buffer.put(digest);
buffer.flip();
- return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data));
+ return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength));
}
private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Thu Aug 11 18:36:52 2011
@@ -319,23 +319,37 @@ public class LedgerHandle implements Rea
}
}
- /**
- * Add entry synchronously to an open ledger.
- *
- * @param data
- * array of bytes to be written to the ledger
- */
-
- public long addEntry(byte[] data) throws InterruptedException, BKException {
- LOG.debug("Adding entry " + data);
- SyncCounter counter = new SyncCounter();
- counter.inc();
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param data
+ * array of bytes to be written to the ledger
+ */
+ public long addEntry(byte[] data) throws InterruptedException, BKException {
+ return addEntry(data, 0, data.length);
+ }
- asyncAddEntry(data, this, counter);
- counter.block(0);
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param data
+ * array of bytes to be written to the ledger
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ */
+ public long addEntry(byte[] data, int offset, int length)
+ throws InterruptedException, BKException {
+ LOG.debug("Adding entry " + data);
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ asyncAddEntry(data, offset, length, this, counter);
+ counter.block(0);
- return counter.getrc();
- }
+ return counter.getrc();
+ }
/**
* Add entry asynchronously to an open ledger.
@@ -347,41 +361,68 @@ public class LedgerHandle implements Rea
* @param ctx
* some control object
*/
- public void asyncAddEntry(final byte[] data, final AddCallback cb,
- final Object ctx) {
- try{
- opCounterSem.acquire();
- } catch (InterruptedException e) {
- cb.addComplete(BKException.Code.InterruptedException,
- LedgerHandle.this, -1, ctx);
- }
-
- try{
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- if (metadata.isClosed()) {
- LOG.warn("Attempt to add to closed ledger: " + ledgerId);
- LedgerHandle.this.opCounterSem.release();
- cb.addComplete(BKException.Code.LedgerClosedException,
- LedgerHandle.this, -1, ctx);
- return;
- }
-
- long entryId = ++lastAddPushed;
- long currentLength = addToLength(data.length);
- PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
- pendingAddOps.add(op);
- ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
- entryId, lastAddConfirmed, currentLength, data);
- op.initiate(toSend);
- }
- });
- } catch (RuntimeException e) {
- opCounterSem.release();
- throw e;
- }
- }
+ public void asyncAddEntry(final byte[] data, final AddCallback cb,
+ final Object ctx) {
+ asyncAddEntry(data, 0, data.length, cb, ctx);
+ }
+
+ /**
+ * Add entry asynchronously to an open ledger, using an offset and range.
+ *
+ * @param data
+ * array of bytes to be written
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ * @throws ArrayIndexOutOfBoundsException if offset or length is negative or
+ * offset and length sum to a value higher than the length of data.
+ */
+ public void asyncAddEntry(final byte[] data, final int offset, final int length,
+ final AddCallback cb, final Object ctx) {
+ if (offset < 0 || length < 0
+ || (offset + length) > data.length) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Invalid values for offset("+offset
+ +") or length("+length+")");
+ }
+ try{
+ opCounterSem.acquire();
+ } catch (InterruptedException e) {
+ cb.addComplete(BKException.Code.InterruptedException,
+ LedgerHandle.this, -1, ctx);
+ }
+
+ try{
+ bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ if (metadata.isClosed()) {
+ LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+ LedgerHandle.this.opCounterSem.release();
+ cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandle.this, -1, ctx);
+ return;
+ }
+
+ long entryId = ++lastAddPushed;
+ long currentLength = addToLength(length);
+ PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+ pendingAddOps.add(op);
+ ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+ entryId, lastAddConfirmed, currentLength, data, offset, length);
+ op.initiate(toSend);
+ }
+ });
+ } catch (RuntimeException e) {
+ opCounterSem.release();
+ throw e;
+ }
+ }
/**
* Obtains last confirmed write from a quorum of bookies.
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java Thu Aug 11 18:36:52 2011
@@ -687,8 +687,9 @@ public class BookKeeperTools {
* bookie we've selected.
*/
LedgerEntry entry = seq.nextElement();
+ byte[] data = entry.getEntry();
ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId,
- lh.getLastAddConfirmed(), entry.getLength(), entry.getEntry());
+ lh.getLastAddConfirmed(), entry.getLength(), data, 0, data.length);
bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend,
new WriteCallback() {
@Override
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java Thu Aug 11 18:36:52 2011
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.Set;
+import java.util.Arrays;
import java.util.concurrent.Semaphore;
@@ -246,6 +247,137 @@ implements AddCallback, ReadCallback, Re
}
}
+ /**
+ * Check that the add api with offset and length work correctly.
+ * First try varying the offset. Then the length with a fixed non-zero
+ * offset.
+ */
+ @Test
+ public void testReadWriteRangeAsyncSingleClient() throws IOException {
+ try {
+ // Create a BookKeeper client and a ledger
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+ byte bytes[] = {'a','b','c','d','e','f','g','h','i'};
+
+ lh.asyncAddEntry(bytes, 0, bytes.length, this, sync);
+ lh.asyncAddEntry(bytes, 0, 4, this, sync); // abcd
+ lh.asyncAddEntry(bytes, 3, 4, this, sync); // defg
+ lh.asyncAddEntry(bytes, 3, (bytes.length-3), this, sync); // defghi
+ int numEntries = 4;
+
+ // wait for all entries to be acknowledged
+ synchronized (sync) {
+ while (sync.counter < numEntries) {
+ LOG.debug("Entries counter = " + sync.counter);
+ sync.wait();
+ }
+ }
+
+ try {
+ lh.asyncAddEntry(bytes, -1, bytes.length, this, sync);
+ fail("Shouldn't be able to use negative offset");
+ } catch (ArrayIndexOutOfBoundsException aiob) {
+ // expected
+ }
+ try {
+ lh.asyncAddEntry(bytes, 0, bytes.length+1, this, sync);
+ fail("Shouldn't be able to use that much length");
+ } catch (ArrayIndexOutOfBoundsException aiob) {
+ // expected
+ }
+ try {
+ lh.asyncAddEntry(bytes, -1, bytes.length+2, this, sync);
+ fail("Shouldn't be able to use negative offset "
+ + "with that much length");
+ } catch (ArrayIndexOutOfBoundsException aiob) {
+ // expected
+ }
+ try {
+ lh.asyncAddEntry(bytes, 4, -3, this, sync);
+ fail("Shouldn't be able to use negative length");
+ } catch (ArrayIndexOutOfBoundsException aiob) {
+ // expected
+ }
+ try {
+ lh.asyncAddEntry(bytes, -4, -3, this, sync);
+ fail("Shouldn't be able to use negative offset & length");
+ } catch (ArrayIndexOutOfBoundsException aiob) {
+ // expected
+ }
+
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+ // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
+ // open ledger
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+ assertTrue("Verifying number of entries written",
+ lh.getLastAddConfirmed() == (numEntries - 1));
+
+ // read entries
+ lh.asyncReadEntries(0, numEntries - 1, this, (Object) sync);
+
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** READ COMPLETE ***");
+
+ // at this point, Enumeration<LedgerEntry> ls is filled with the returned
+ // values
+ int i = 0;
+ while (ls.hasMoreElements()) {
+ byte[] expected = null;
+ byte[] entry = ls.nextElement().getEntry();
+
+ switch (i) {
+ case 0:
+ expected = Arrays.copyOfRange(bytes, 0, bytes.length);
+ break;
+ case 1:
+ expected = Arrays.copyOfRange(bytes, 0, 4);
+ break;
+ case 2:
+ expected = Arrays.copyOfRange(bytes, 3, 3+4);
+ break;
+ case 3:
+ expected = Arrays.copyOfRange(bytes, 3, 3+(bytes.length-3));
+ break;
+ }
+ assertNotNull("There are more checks than writes", expected);
+
+ String message = "Checking entry " + i + " for equality ["
+ + new String(entry, "UTF-8") + ","
+ + new String(expected, "UTF-8") + "]";
+ assertTrue(message, Arrays.equals(entry, expected));
+
+ i++;
+ }
+ assertTrue("Checking number of read entries", i == numEntries);
+
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
class ThrottleTestCallback implements ReadCallback {
int throttle;
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java Thu Aug 11 18:36:52 2011
@@ -119,7 +119,8 @@ public class LedgerCacheTest extends Tes
byte[] masterKey = "blah".getBytes();
for( int i = 0; i < 30000; i++){
MacDigestManager dm = new MacDigestManager(i, masterKey);
- ByteBuffer entry = dm.computeDigestAndPackageForSending(0, 0, 10, "0123456789".getBytes()).toByteBuffer();
+ byte[] data = "0123456789".getBytes();
+ ByteBuffer entry = dm.computeDigestAndPackageForSending(0, 0, 10, data, 0, data.length).toByteBuffer();
bookie.addEntry(entry, new TestWriteCallback(), null, masterKey);
}
} catch (IOException e) {