You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/21 06:31:03 UTC

[GitHub] sijie closed pull request #1585: Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)

sijie closed pull request #1585: Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)
URL: https://github.com/apache/bookkeeper/pull/1585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 611b1827c3..7b913d4b42 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1067,7 +1067,6 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length,
     }
 
     public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
-        data.retain();
         PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
         doAsyncAddEntry(op);
     }
@@ -1125,6 +1124,26 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
         cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
     }
 
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * This can be used only with {@link LedgerHandleAdv} returned through
+     * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId of the entry to add.
+     * @param data
+     *            io.netty.buffer.ByteBuf of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    public void asyncAddEntry(final long entryId, ByteBuf data,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 153ceebeca..48beaa7790 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -177,8 +177,23 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
         asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
-    private void asyncAddEntry(final long entryId, ByteBuf data,
-            final AddCallbackWithLatency cb, final Object ctx) {
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * This can be used only with {@link LedgerHandleAdv} returned through
+     * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId of the entry to add.
+     * @param data
+     *            io.netty.buffer.ByteBuf of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    @Override
+    public void asyncAddEntry(final long entryId, ByteBuf data,
+                              final AddCallbackWithLatency cb, final Object ctx) {
         PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
         op.setEntryId(entryId);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 5f1722634b..afcaed6c00 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -243,6 +243,8 @@ public void safeRun() {
         this.toSend = lh.macManager.computeDigestAndPackageForSending(
                 entryId, lh.lastAddConfirmed, currentLedgerLength,
                 payload);
+        // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
+        payload = null;
 
         // We are about to send. Check if we need to make an ensemble change
         // becasue of delayed write errors
@@ -456,7 +458,10 @@ private void maybeRecycle() {
     private void recyclePendAddOpObject() {
         entryId = LedgerHandle.INVALID_ENTRY_ID;
         currentLedgerLength = -1;
-        payload = null;
+        if (payload != null) {
+            ReferenceCountUtil.release(payload);
+            payload = null;
+        }
         cb = null;
         ctx = null;
         ackSet.recycle();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2627db8226..4c174a8df1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -20,6 +20,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 
 import java.security.GeneralSecurityException;
 
@@ -127,6 +128,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
             populateValueAndReset(sendBuffer);
 
             sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
+            ReferenceCountUtil.release(data);
 
             return ByteBufList.get(sendBuffer);
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 93683c8d81..85376eafd8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -28,8 +28,13 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -43,6 +48,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -1294,6 +1300,101 @@ public void testLedgerCreateAdvSyncAsyncAddDuplicateEntryIds() throws Exception
         lh.close();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLedgerCreateAdvByteBufRefCnt() throws Exception {
+        long ledgerId = rng.nextLong();
+        ledgerId &= Long.MAX_VALUE;
+        if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+            // since LongHierarchicalLedgerManager supports ledgerIds of
+            // decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            ledgerId %= 9999999999L;
+        }
+
+        final LedgerHandle lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+
+        final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+                new PooledByteBufAllocator(true),
+                new PooledByteBufAllocator(false),
+                new UnpooledByteBufAllocator(true),
+                new UnpooledByteBufAllocator(false));
+
+        long entryId = 0;
+        for (AbstractByteBufAllocator alloc: allocs) {
+            final ByteBuf data = alloc.buffer(10);
+            data.writeBytes(("fragment0" + entryId).getBytes());
+            assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+            CompletableFuture<Integer> cf = new CompletableFuture<>();
+            lh.asyncAddEntry(entryId, data, (rc, handle, eId, qwcLatency, ctx) -> {
+                CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+                future.complete(rc);
+            }, cf);
+
+            int rc = cf.get();
+            assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+            for (int i = 0; i < 10; i++) {
+                if (data.refCnt() == 0) {
+                    break;
+                }
+                TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+            }
+            assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+                    0, data.refCnt());
+
+            org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+            assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+            entryId++;
+        }
+
+        bkc.deleteLedger(lh.ledgerId);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLedgerCreateByteBufRefCnt() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, null);
+
+        final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+                new PooledByteBufAllocator(true),
+                new PooledByteBufAllocator(false),
+                new UnpooledByteBufAllocator(true),
+                new UnpooledByteBufAllocator(false));
+
+        int entryId = 0;
+        for (AbstractByteBufAllocator alloc: allocs) {
+            final ByteBuf data = alloc.buffer(10);
+            data.writeBytes(("fragment0" + entryId).getBytes());
+            assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+            CompletableFuture<Integer> cf = new CompletableFuture<>();
+            lh.asyncAddEntry(data, (rc, handle, eId, ctx) -> {
+                CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+                future.complete(rc);
+            }, cf);
+
+            int rc = cf.get();
+            assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+            for (int i = 0; i < 10; i++) {
+                if (data.refCnt() == 0) {
+                    break;
+                }
+                TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+            }
+            assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+                    0, data.refCnt());
+
+            org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+            assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+            entryId++;
+        }
+
+        bkc.deleteLedger(lh.ledgerId);
+    }
+
     private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
         ls = lh.readEntries(0, numEntriesToWrite - 1);
         int index = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
index aaa76450f3..95dec9c21a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -50,9 +50,9 @@ public void testAddEntryLastAddConfirmedDoesNotAdvance() throws Exception {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -69,9 +69,9 @@ public void testAddEntryLastAddConfirmedAdvanceWithForce() throws Exception {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -90,20 +90,20 @@ public void testForceOnWriteAdvHandle() throws Exception {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .makeAdv()
                 .execute())) {
-            CompletableFuture<Long> w0 = wh.writeAsync(0, DATA);
-            CompletableFuture<Long> w2 = wh.writeAsync(2, DATA);
-            CompletableFuture<Long> w3 = wh.writeAsync(3, DATA);
+            CompletableFuture<Long> w0 = wh.writeAsync(0, DATA.retainedDuplicate());
+            CompletableFuture<Long> w2 = wh.writeAsync(2, DATA.retainedDuplicate());
+            CompletableFuture<Long> w3 = wh.writeAsync(3, DATA.retainedDuplicate());
             result(w0);
             result(wh.force());
             assertEquals(0, wh.getLastAddConfirmed());
-            CompletableFuture<Long> w1 = wh.writeAsync(1, DATA);
+            CompletableFuture<Long> w1 = wh.writeAsync(1, DATA.retainedDuplicate());
             result(w3);
             assertTrue(w1.isDone());
             assertTrue(w2.isDone());
-            CompletableFuture<Long> w5 = wh.writeAsync(5, DATA);
+            CompletableFuture<Long> w5 = wh.writeAsync(5, DATA.retainedDuplicate());
             result(wh.force());
             assertEquals(3, wh.getLastAddConfirmed());
-            wh.writeAsync(4, DATA);
+            wh.writeAsync(4, DATA.retainedDuplicate());
             result(w5);
             result(wh.force());
             assertEquals(5, wh.getLastAddConfirmed());
@@ -120,9 +120,9 @@ public void testForceRequiresFullEnsemble() throws Exception {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -131,7 +131,7 @@ public void testForceRequiresFullEnsemble() throws Exception {
             killBookie(bookieAddress);
 
             // write should succeed (we still have 2 bookies out of 3)
-            result(wh.appendAsync(DATA));
+            result(wh.appendAsync(DATA.retainedDuplicate()));
 
             // force cannot go, it must be acknowledged by all of the bookies in the ensamble
             try {
@@ -154,9 +154,9 @@ public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Excepti
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA));
+            long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
             assertEquals(-1, wh.getLastAddConfirmed());
 
@@ -170,7 +170,7 @@ public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Excepti
             assertEquals(-1, wh.getLastAddConfirmed());
 
             // send an entry and receive ack
-            long lastEntry = wh.append(DATA);
+            long lastEntry = wh.append(DATA.retainedDuplicate());
 
             // receive the ack for forceLedger
             resumeBookieWriteAcks(bookieAddress);
@@ -195,7 +195,7 @@ public void testForbiddenEnsembleChange() throws Exception {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                wh.append(DATA);
+                wh.append(DATA.retainedDuplicate());
             }
 
             assertEquals(1, availableBookies.size());
@@ -207,7 +207,7 @@ public void testForbiddenEnsembleChange() throws Exception {
 
             try {
                 // we cannot switch to the new bookie with DEFERRED_SYNC
-                wh.append(DATA);
+                wh.append(DATA.retainedDuplicate());
                 fail("since ensemble change is disable we cannot be able to write any more");
             } catch (BKException.BKWriteException ex) {
                 // expected
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 98c88f7eb6..733593ad60 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -165,7 +165,7 @@ public synchronized ByteBuf getBuffer() throws InvalidEnvelopedEntryException, I
         if (null == finalizedBuffer) {
             finalizedBuffer = finalizeBuffer();
         }
-        return finalizedBuffer.slice();
+        return finalizedBuffer.retainedSlice();
     }
 
     private ByteBuf finalizeBuffer() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services