You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/08/11 20:36:53 UTC

svn commit: r1156723 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/

Author: fpj
Date: Thu Aug 11 18:36:52 2011
New Revision: 1156723

URL: http://svn.apache.org/viewvc?rev=1156723&view=rev
Log:
BOOKKEEPER-33: Add length and offset parameter to addEntry (ivank via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Aug 11 18:36:52 2011
@@ -26,3 +26,5 @@ BUGFIXES:
   
   BOOKKEEPER-29: BookieRecoveryTest fails intermittently (fpj via ivank)
 
+  BOOKKEEPER-33: Add length and offset parameter to addEntry (ivank via fpj)
+

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java Thu Aug 11 18:36:52 2011
@@ -79,7 +79,7 @@ public abstract class DigestManager {
      * @return
      */
     
-    public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data) {
+    public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) {
 
         byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength];
         ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
@@ -90,7 +90,7 @@ public abstract class DigestManager {
         buffer.flip();
 
         update(buffer.array(), 0, METADATA_LENGTH);
-        update(data);
+        update(data, doffset, dlength);
         byte[] digest = getValueAndReset();
 
         buffer.limit(buffer.capacity());
@@ -98,7 +98,7 @@ public abstract class DigestManager {
         buffer.put(digest);
         buffer.flip();
 
-        return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data));
+        return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength));
     }
 
     private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Thu Aug 11 18:36:52 2011
@@ -319,23 +319,37 @@ public class LedgerHandle implements Rea
     }
   }
 
-  /**
-   * Add entry synchronously to an open ledger.
-   * 
-   * @param data
-   *         array of bytes to be written to the ledger
-   */
-
-  public long addEntry(byte[] data) throws InterruptedException, BKException {
-    LOG.debug("Adding entry " + data);
-    SyncCounter counter = new SyncCounter();
-    counter.inc();
+    /**
+     * Add entry synchronously to an open ledger.
+     * 
+     * @param data
+     *         array of bytes to be written to the ledger
+     */
+    public long addEntry(byte[] data) throws InterruptedException, BKException {
+        return addEntry(data, 0, data.length);
+    }
 
-    asyncAddEntry(data, this, counter);
-    counter.block(0);
+    /**
+     * Add entry synchronously to an open ledger.
+     * 
+     * @param data
+     *         array of bytes to be written to the ledger
+     * @param offset
+     *          offset from which to take bytes from data
+     * @param length
+     *          number of bytes to take from data
+     */
+    public long addEntry(byte[] data, int offset, int length) 
+            throws InterruptedException, BKException {
+        LOG.debug("Adding entry " + data);
+        SyncCounter counter = new SyncCounter();
+        counter.inc();
+        
+        asyncAddEntry(data, offset, length, this, counter);
+        counter.block(0);
 
-    return counter.getrc();
-  }
+        return counter.getrc();
+    }
 
   /**
    * Add entry asynchronously to an open ledger.
@@ -347,41 +361,68 @@ public class LedgerHandle implements Rea
    * @param ctx
    *          some control object
    */
-  public void asyncAddEntry(final byte[] data, final AddCallback cb,
-      final Object ctx) {
-      try{
-          opCounterSem.acquire();
-      } catch (InterruptedException e) {
-          cb.addComplete(BKException.Code.InterruptedException,
-                  LedgerHandle.this, -1, ctx);
-      }
-      
-      try{
-          bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-              @Override
-              public void safeRun() {
-                  if (metadata.isClosed()) {
-                      LOG.warn("Attempt to add to closed ledger: " + ledgerId);
-                      LedgerHandle.this.opCounterSem.release();
-                      cb.addComplete(BKException.Code.LedgerClosedException,
-                              LedgerHandle.this, -1, ctx);
-                      return;
-                  }
-
-                  long entryId = ++lastAddPushed;
-                  long currentLength = addToLength(data.length);
-                  PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
-                  pendingAddOps.add(op);
-                  ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
-                          entryId, lastAddConfirmed, currentLength, data);
-                  op.initiate(toSend);
-              }
-          });
-      } catch (RuntimeException e) {
-          opCounterSem.release();
-          throw e;
-      }
-  }
+    public void asyncAddEntry(final byte[] data, final AddCallback cb, 
+                              final Object ctx) {
+        asyncAddEntry(data, 0, data.length, cb, ctx);
+    }
+
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * 
+     * @param data
+     *          array of bytes to be written
+     * @param offset
+     *          offset from which to take bytes from data
+     * @param length
+     *          number of bytes to take from data
+     * @param cb
+     *          object implementing callbackinterface
+     * @param ctx
+     *          some control object
+     * @throws ArrayIndexOutOfBoundsException if offset or length is negative or 
+     *          offset and length sum to a value higher than the length of data.
+     */
+    public void asyncAddEntry(final byte[] data, final int offset, final int length, 
+                              final AddCallback cb, final Object ctx) {
+        if (offset < 0 || length < 0
+            || (offset + length) > data.length) {
+            throw new ArrayIndexOutOfBoundsException(
+                    "Invalid values for offset("+offset
+                    +") or length("+length+")");
+        }
+        try{
+            opCounterSem.acquire();
+        } catch (InterruptedException e) {
+            cb.addComplete(BKException.Code.InterruptedException,
+                    LedgerHandle.this, -1, ctx);
+        }
+        
+        try{
+            bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        if (metadata.isClosed()) {
+                            LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+                            LedgerHandle.this.opCounterSem.release();
+                            cb.addComplete(BKException.Code.LedgerClosedException,
+                                           LedgerHandle.this, -1, ctx);
+                            return;
+                        }
+                        
+                        long entryId = ++lastAddPushed;
+                        long currentLength = addToLength(length);
+                        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+                        pendingAddOps.add(op);
+                        ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+                                entryId, lastAddConfirmed, currentLength, data, offset, length);
+                        op.initiate(toSend);
+                    }
+                });
+        } catch (RuntimeException e) {
+            opCounterSem.release();
+            throw e;
+        }
+    }
 
   /**
    * Obtains last confirmed write from a quorum of bookies.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/BookKeeperTools.java Thu Aug 11 18:36:52 2011
@@ -687,8 +687,9 @@ public class BookKeeperTools {
                  * bookie we've selected.
                  */
                 LedgerEntry entry = seq.nextElement();
+                byte[] data = entry.getEntry();
                 ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId,
-                        lh.getLastAddConfirmed(), entry.getLength(), entry.getEntry());
+                        lh.getLastAddConfirmed(), entry.getLength(), data, 0, data.length);
                 bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend,
                         new WriteCallback() {
                             @Override

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java Thu Aug 11 18:36:52 2011
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
+import java.util.Arrays;
 import java.util.concurrent.Semaphore;
 
 
@@ -246,6 +247,137 @@ implements AddCallback, ReadCallback, Re
         }
     }
 
+    /**
+     * Check that the add api with offset and length work correctly.
+     * First try varying the offset. Then the length with a fixed non-zero
+     * offset.
+     */
+    @Test
+    public void testReadWriteRangeAsyncSingleClient() throws IOException {
+        try {
+            // Create a BookKeeper client and a ledger
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            byte bytes[] = {'a','b','c','d','e','f','g','h','i'};
+            
+            lh.asyncAddEntry(bytes, 0, bytes.length, this, sync);
+            lh.asyncAddEntry(bytes, 0, 4, this, sync); // abcd
+            lh.asyncAddEntry(bytes, 3, 4, this, sync); // defg
+            lh.asyncAddEntry(bytes, 3, (bytes.length-3), this, sync); // defghi
+            int numEntries = 4;
+
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntries) {
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+
+            try {
+                lh.asyncAddEntry(bytes, -1, bytes.length, this, sync); 
+                fail("Shouldn't be able to use negative offset");
+            } catch (ArrayIndexOutOfBoundsException aiob) {
+                // expected
+            }
+            try {
+                lh.asyncAddEntry(bytes, 0, bytes.length+1, this, sync); 
+                fail("Shouldn't be able to use that much length");
+            } catch (ArrayIndexOutOfBoundsException aiob) {
+                // expected
+            }
+            try {
+                lh.asyncAddEntry(bytes, -1, bytes.length+2, this, sync); 
+                fail("Shouldn't be able to use negative offset "
+                     + "with that much length");
+            } catch (ArrayIndexOutOfBoundsException aiob) {
+                // expected
+            }
+            try {
+                lh.asyncAddEntry(bytes, 4, -3, this, sync); 
+                fail("Shouldn't be able to use negative length");
+            } catch (ArrayIndexOutOfBoundsException aiob) {
+                // expected
+            }
+            try {
+                lh.asyncAddEntry(bytes, -4, -3, this, sync); 
+                fail("Shouldn't be able to use negative offset & length");
+            } catch (ArrayIndexOutOfBoundsException aiob) {
+                // expected
+            }
+            
+
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger
+            lh.close();
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
+            // open ledger
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", 
+                       lh.getLastAddConfirmed() == (numEntries - 1));
+
+            // read entries
+            lh.asyncReadEntries(0, numEntries - 1, this, (Object) sync);
+
+            synchronized (sync) {
+                while (sync.value == false) {
+                    sync.wait();
+                }
+            }
+
+            LOG.debug("*** READ COMPLETE ***");
+
+            // at this point, Enumeration<LedgerEntry> ls is filled with the returned
+            // values
+            int i = 0;
+            while (ls.hasMoreElements()) {
+                byte[] expected = null;
+                byte[] entry = ls.nextElement().getEntry();
+                
+                switch (i) {
+                case 0: 
+                    expected = Arrays.copyOfRange(bytes, 0, bytes.length);
+                    break;
+                case 1: 
+                    expected = Arrays.copyOfRange(bytes, 0, 4);
+                    break;
+                case 2: 
+                    expected = Arrays.copyOfRange(bytes, 3, 3+4);
+                    break;
+                case 3: 
+                    expected = Arrays.copyOfRange(bytes, 3, 3+(bytes.length-3));
+                    break;
+                }
+                assertNotNull("There are more checks than writes", expected);
+                
+                String message = "Checking entry " + i + " for equality ["
+                    + new String(entry, "UTF-8") + "," 
+                    + new String(expected, "UTF-8") + "]";
+                assertTrue(message, Arrays.equals(entry, expected));
+
+                i++;
+            }
+            assertTrue("Checking number of read entries", i == numEntries);
+
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        }
+    }
+
     class ThrottleTestCallback implements ReadCallback {
         int throttle;
         

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java?rev=1156723&r1=1156722&r2=1156723&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerCacheTest.java Thu Aug 11 18:36:52 2011
@@ -119,7 +119,8 @@ public class LedgerCacheTest extends Tes
             byte[] masterKey = "blah".getBytes();
             for( int i = 0; i < 30000; i++){
                 MacDigestManager dm = new MacDigestManager(i, masterKey);
-                ByteBuffer entry = dm.computeDigestAndPackageForSending(0, 0, 10, "0123456789".getBytes()).toByteBuffer();
+                byte[] data = "0123456789".getBytes();
+                ByteBuffer entry = dm.computeDigestAndPackageForSending(0, 0, 10, data, 0, data.length).toByteBuffer();
                 bookie.addEntry(entry, new TestWriteCallback(), null, masterKey);
             }
         } catch (IOException e) {