You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/21 06:31:13 UTC

[bookkeeper] branch branch-4.8 updated: Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.8
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.8 by this push:
     new 432a11b  Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)
432a11b is described below

commit 432a11b0bf02a44013c0462b22da3687ed7353cd
Author: Andrey Yegorov <ay...@salesforce.com>
AuthorDate: Mon Aug 20 23:30:52 2018 -0700

    Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)
    
    Descriptions of the changes in this PR:
    
    - exposed asyncAddEntry as public, similarly to other variants.
    - fixed ByteBuf retention
    
    ### Motivation
    
    It's useful to have this exposed as public for clients to make use of netty's allocator and pass ByteBuf directly.
    
    ### Changes
    
    exposed asyncAddEntry as public, similarly to other variants.
    
    Master Issue: #1584
    
    Author: Andrey Yegorov <ay...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1585 from dlg99/feature/issue-1584-LedgerHandleAdv-public-addEntry-bytebuff, closes #1584
    
    (cherry picked from commit 299fb58deed3dc284342716d52b7f918cc6cefc4)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../org/apache/bookkeeper/client/LedgerHandle.java |  21 ++++-
 .../apache/bookkeeper/client/LedgerHandleAdv.java  |  19 +++-
 .../org/apache/bookkeeper/client/PendingAddOp.java |   7 +-
 .../bookkeeper/proto/checksum/DigestManager.java   |   2 +
 .../bookkeeper/client/BookieWriteLedgerTest.java   | 101 +++++++++++++++++++++
 .../apache/bookkeeper/client/DeferredSyncTest.java |  36 ++++----
 .../distributedlog/EnvelopedEntryWriter.java       |   2 +-
 7 files changed, 165 insertions(+), 23 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index a646651..bd81b73 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1067,7 +1067,6 @@ public class LedgerHandle implements WriteHandle {
     }
 
     public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
-        data.retain();
         PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
         doAsyncAddEntry(op);
     }
@@ -1126,6 +1125,26 @@ public class LedgerHandle implements WriteHandle {
     }
 
     /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * This can be used only with {@link LedgerHandleAdv} returned through
+     * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId of the entry to add.
+     * @param data
+     *            io.netty.buffer.ByteBuf of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    public void asyncAddEntry(final long entryId, ByteBuf data,
+                              final AddCallbackWithLatency cb, final Object ctx) {
+        LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+        cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
+    }
+
+    /**
      * {@inheritDoc}
      */
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 153ceeb..48beaa7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -177,8 +177,23 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
         asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
     }
 
-    private void asyncAddEntry(final long entryId, ByteBuf data,
-            final AddCallbackWithLatency cb, final Object ctx) {
+    /**
+     * Add entry asynchronously to an open ledger, using an offset and range.
+     * This can be used only with {@link LedgerHandleAdv} returned through
+     * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+     *
+     * @param entryId
+     *            entryId of the entry to add.
+     * @param data
+     *            io.netty.buffer.ByteBuf of bytes to be written
+     * @param cb
+     *            object implementing callbackinterface
+     * @param ctx
+     *            some control object
+     */
+    @Override
+    public void asyncAddEntry(final long entryId, ByteBuf data,
+                              final AddCallbackWithLatency cb, final Object ctx) {
         PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
         op.setEntryId(entryId);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 5f17226..afcaed6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -243,6 +243,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
         this.toSend = lh.macManager.computeDigestAndPackageForSending(
                 entryId, lh.lastAddConfirmed, currentLedgerLength,
                 payload);
+        // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
+        payload = null;
 
         // We are about to send. Check if we need to make an ensemble change
         // becasue of delayed write errors
@@ -456,7 +458,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
     private void recyclePendAddOpObject() {
         entryId = LedgerHandle.INVALID_ENTRY_ID;
         currentLedgerLength = -1;
-        payload = null;
+        if (payload != null) {
+            ReferenceCountUtil.release(payload);
+            payload = null;
+        }
         cb = null;
         ctx = null;
         ackSet.recycle();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2627db8..4c174a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto.checksum;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 
 import java.security.GeneralSecurityException;
 
@@ -127,6 +128,7 @@ public abstract class DigestManager {
             populateValueAndReset(sendBuffer);
 
             sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
+            ReferenceCountUtil.release(data);
 
             return ByteBufList.get(sendBuffer);
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 93683c8..85376ea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -28,8 +28,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.AbstractByteBufAllocator;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -43,6 +48,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -1294,6 +1300,101 @@ public class BookieWriteLedgerTest extends
         lh.close();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLedgerCreateAdvByteBufRefCnt() throws Exception {
+        long ledgerId = rng.nextLong();
+        ledgerId &= Long.MAX_VALUE;
+        if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+            // since LongHierarchicalLedgerManager supports ledgerIds of
+            // decimal length upto 19 digits but other
+            // LedgerManagers only upto 10 decimals
+            ledgerId %= 9999999999L;
+        }
+
+        final LedgerHandle lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+
+        final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+                new PooledByteBufAllocator(true),
+                new PooledByteBufAllocator(false),
+                new UnpooledByteBufAllocator(true),
+                new UnpooledByteBufAllocator(false));
+
+        long entryId = 0;
+        for (AbstractByteBufAllocator alloc: allocs) {
+            final ByteBuf data = alloc.buffer(10);
+            data.writeBytes(("fragment0" + entryId).getBytes());
+            assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+            CompletableFuture<Integer> cf = new CompletableFuture<>();
+            lh.asyncAddEntry(entryId, data, (rc, handle, eId, qwcLatency, ctx) -> {
+                CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+                future.complete(rc);
+            }, cf);
+
+            int rc = cf.get();
+            assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+            for (int i = 0; i < 10; i++) {
+                if (data.refCnt() == 0) {
+                    break;
+                }
+                TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+            }
+            assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+                    0, data.refCnt());
+
+            org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+            assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+            entryId++;
+        }
+
+        bkc.deleteLedger(lh.ledgerId);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLedgerCreateByteBufRefCnt() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, null);
+
+        final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+                new PooledByteBufAllocator(true),
+                new PooledByteBufAllocator(false),
+                new UnpooledByteBufAllocator(true),
+                new UnpooledByteBufAllocator(false));
+
+        int entryId = 0;
+        for (AbstractByteBufAllocator alloc: allocs) {
+            final ByteBuf data = alloc.buffer(10);
+            data.writeBytes(("fragment0" + entryId).getBytes());
+            assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+            CompletableFuture<Integer> cf = new CompletableFuture<>();
+            lh.asyncAddEntry(data, (rc, handle, eId, ctx) -> {
+                CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+                future.complete(rc);
+            }, cf);
+
+            int rc = cf.get();
+            assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+            for (int i = 0; i < 10; i++) {
+                if (data.refCnt() == 0) {
+                    break;
+                }
+                TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+            }
+            assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+                    0, data.refCnt());
+
+            org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+            assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+            entryId++;
+        }
+
+        bkc.deleteLedger(lh.ledgerId);
+    }
+
     private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
         ls = lh.readEntries(0, numEntriesToWrite - 1);
         int index = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
index aaa7645..95dec9c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -50,9 +50,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -69,9 +69,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -90,20 +90,20 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .makeAdv()
                 .execute())) {
-            CompletableFuture<Long> w0 = wh.writeAsync(0, DATA);
-            CompletableFuture<Long> w2 = wh.writeAsync(2, DATA);
-            CompletableFuture<Long> w3 = wh.writeAsync(3, DATA);
+            CompletableFuture<Long> w0 = wh.writeAsync(0, DATA.retainedDuplicate());
+            CompletableFuture<Long> w2 = wh.writeAsync(2, DATA.retainedDuplicate());
+            CompletableFuture<Long> w3 = wh.writeAsync(3, DATA.retainedDuplicate());
             result(w0);
             result(wh.force());
             assertEquals(0, wh.getLastAddConfirmed());
-            CompletableFuture<Long> w1 = wh.writeAsync(1, DATA);
+            CompletableFuture<Long> w1 = wh.writeAsync(1, DATA.retainedDuplicate());
             result(w3);
             assertTrue(w1.isDone());
             assertTrue(w2.isDone());
-            CompletableFuture<Long> w5 = wh.writeAsync(5, DATA);
+            CompletableFuture<Long> w5 = wh.writeAsync(5, DATA.retainedDuplicate());
             result(wh.force());
             assertEquals(3, wh.getLastAddConfirmed());
-            wh.writeAsync(4, DATA);
+            wh.writeAsync(4, DATA.retainedDuplicate());
             result(w5);
             result(wh.force());
             assertEquals(5, wh.getLastAddConfirmed());
@@ -120,9 +120,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryID = result(wh.appendAsync(DATA));
+            long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryID);
             assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
             assertEquals(-1, wh.getLastAddConfirmed());
@@ -131,7 +131,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
             killBookie(bookieAddress);
 
             // write should succeed (we still have 2 bookies out of 3)
-            result(wh.appendAsync(DATA));
+            result(wh.appendAsync(DATA.retainedDuplicate()));
 
             // force cannot go, it must be acknowledged by all of the bookies in the ensamble
             try {
@@ -154,9 +154,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                result(wh.appendAsync(DATA));
+                result(wh.appendAsync(DATA.retainedDuplicate()));
             }
-            long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA));
+            long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA.retainedDuplicate()));
             assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
             assertEquals(-1, wh.getLastAddConfirmed());
 
@@ -170,7 +170,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
             assertEquals(-1, wh.getLastAddConfirmed());
 
             // send an entry and receive ack
-            long lastEntry = wh.append(DATA);
+            long lastEntry = wh.append(DATA.retainedDuplicate());
 
             // receive the ack for forceLedger
             resumeBookieWriteAcks(bookieAddress);
@@ -195,7 +195,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
                 .withWriteFlags(WriteFlag.DEFERRED_SYNC)
                 .execute())) {
             for (int i = 0; i < NUM_ENTRIES - 1; i++) {
-                wh.append(DATA);
+                wh.append(DATA.retainedDuplicate());
             }
 
             assertEquals(1, availableBookies.size());
@@ -207,7 +207,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
 
             try {
                 // we cannot switch to the new bookie with DEFERRED_SYNC
-                wh.append(DATA);
+                wh.append(DATA.retainedDuplicate());
                 fail("since ensemble change is disable we cannot be able to write any more");
             } catch (BKException.BKWriteException ex) {
                 // expected
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 98c88f7..733593a 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -165,7 +165,7 @@ class EnvelopedEntryWriter implements Writer {
         if (null == finalizedBuffer) {
             finalizedBuffer = finalizeBuffer();
         }
-        return finalizedBuffer.slice();
+        return finalizedBuffer.retainedSlice();
     }
 
     private ByteBuf finalizeBuffer() {