You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/21 06:31:00 UTC
[bookkeeper] branch master updated: Issue #1584: LedgerHandleAdv
should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle
exposes it as public)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 299fb58 Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)
299fb58 is described below
commit 299fb58deed3dc284342716d52b7f918cc6cefc4
Author: Andrey Yegorov <ay...@salesforce.com>
AuthorDate: Mon Aug 20 23:30:52 2018 -0700
Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public)
Descriptions of the changes in this PR:
- exposed asyncAddEntry as public, similarly to other variants.
- fixed ByteBuf retention
### Motivation
It's useful to have this exposed as public for clients to make use of netty's allocator and pass ByteBuf directly.
### Changes
exposed asyncAddEntry as public, similarly to other variants.
Master Issue: #1584
Author: Andrey Yegorov <ay...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1585 from dlg99/feature/issue-1584-LedgerHandleAdv-public-addEntry-bytebuff, closes #1584
---
.../org/apache/bookkeeper/client/LedgerHandle.java | 21 ++++-
.../apache/bookkeeper/client/LedgerHandleAdv.java | 19 +++-
.../org/apache/bookkeeper/client/PendingAddOp.java | 7 +-
.../bookkeeper/proto/checksum/DigestManager.java | 2 +
.../bookkeeper/client/BookieWriteLedgerTest.java | 101 +++++++++++++++++++++
.../apache/bookkeeper/client/DeferredSyncTest.java | 36 ++++----
.../distributedlog/EnvelopedEntryWriter.java | 2 +-
7 files changed, 165 insertions(+), 23 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 535584b..088a7c5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -1057,7 +1057,6 @@ public class LedgerHandle implements WriteHandle {
}
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
- data.retain();
PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
doAsyncAddEntry(op);
}
@@ -1116,6 +1115,26 @@ public class LedgerHandle implements WriteHandle {
}
/**
+ * Add entry asynchronously to an open ledger, using an offset and range.
+ * This can be used only with {@link LedgerHandleAdv} returned through
+ * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ * @param entryId
+ * entryId of the entry to add.
+ * @param data
+ * io.netty.buffer.ByteBuf of bytes to be written
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ */
+ public void asyncAddEntry(final long entryId, ByteBuf data,
+ final AddCallbackWithLatency cb, final Object ctx) {
+ LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+ cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
index 153ceeb..48beaa7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -177,8 +177,23 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle {
asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
}
- private void asyncAddEntry(final long entryId, ByteBuf data,
- final AddCallbackWithLatency cb, final Object ctx) {
+ /**
+ * Add entry asynchronously to an open ledger, using an offset and range.
+ * This can be used only with {@link LedgerHandleAdv} returned through
+ * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ * @param entryId
+ * entryId of the entry to add.
+ * @param data
+ * io.netty.buffer.ByteBuf of bytes to be written
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ */
+ @Override
+ public void asyncAddEntry(final long entryId, ByteBuf data,
+ final AddCallbackWithLatency cb, final Object ctx) {
PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx);
op.setEntryId(entryId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 5f17226..afcaed6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -243,6 +243,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
payload);
+ // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
+ payload = null;
// We are about to send. Check if we need to make an ensemble change
// becasue of delayed write errors
@@ -456,7 +458,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback {
private void recyclePendAddOpObject() {
entryId = LedgerHandle.INVALID_ENTRY_ID;
currentLedgerLength = -1;
- payload = null;
+ if (payload != null) {
+ ReferenceCountUtil.release(payload);
+ payload = null;
+ }
cb = null;
ctx = null;
ackSet.recycle();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 2627db8..4c174a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.security.GeneralSecurityException;
@@ -127,6 +128,7 @@ public abstract class DigestManager {
populateValueAndReset(sendBuffer);
sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes());
+ ReferenceCountUtil.release(data);
return ByteBufList.get(sendBuffer);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 93683c8..85376ea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -28,8 +28,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -43,6 +48,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.Bookie;
@@ -1294,6 +1300,101 @@ public class BookieWriteLedgerTest extends
lh.close();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLedgerCreateAdvByteBufRefCnt() throws Exception {
+ long ledgerId = rng.nextLong();
+ ledgerId &= Long.MAX_VALUE;
+ if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+ // since LongHierarchicalLedgerManager supports ledgerIds of
+ // decimal length upto 19 digits but other
+ // LedgerManagers only upto 10 decimals
+ ledgerId %= 9999999999L;
+ }
+
+ final LedgerHandle lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null);
+
+ final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+ new PooledByteBufAllocator(true),
+ new PooledByteBufAllocator(false),
+ new UnpooledByteBufAllocator(true),
+ new UnpooledByteBufAllocator(false));
+
+ long entryId = 0;
+ for (AbstractByteBufAllocator alloc: allocs) {
+ final ByteBuf data = alloc.buffer(10);
+ data.writeBytes(("fragment0" + entryId).getBytes());
+ assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+ CompletableFuture<Integer> cf = new CompletableFuture<>();
+ lh.asyncAddEntry(entryId, data, (rc, handle, eId, qwcLatency, ctx) -> {
+ CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+ future.complete(rc);
+ }, cf);
+
+ int rc = cf.get();
+ assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+ for (int i = 0; i < 10; i++) {
+ if (data.refCnt() == 0) {
+ break;
+ }
+ TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+ }
+ assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+ 0, data.refCnt());
+
+ org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+ assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+ entryId++;
+ }
+
+ bkc.deleteLedger(lh.ledgerId);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLedgerCreateByteBufRefCnt() throws Exception {
+ final LedgerHandle lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, null);
+
+ final List<AbstractByteBufAllocator> allocs = Lists.newArrayList(
+ new PooledByteBufAllocator(true),
+ new PooledByteBufAllocator(false),
+ new UnpooledByteBufAllocator(true),
+ new UnpooledByteBufAllocator(false));
+
+ int entryId = 0;
+ for (AbstractByteBufAllocator alloc: allocs) {
+ final ByteBuf data = alloc.buffer(10);
+ data.writeBytes(("fragment0" + entryId).getBytes());
+ assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt());
+
+ CompletableFuture<Integer> cf = new CompletableFuture<>();
+ lh.asyncAddEntry(data, (rc, handle, eId, ctx) -> {
+ CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx;
+ future.complete(rc);
+ }, cf);
+
+ int rc = cf.get();
+ assertEquals("rc code is OK", BKException.Code.OK, rc);
+
+ for (int i = 0; i < 10; i++) {
+ if (data.refCnt() == 0) {
+ break;
+ }
+ TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously
+ }
+ assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ",
+ 0, data.refCnt());
+
+ org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId);
+ assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes()));
+ entryId++;
+ }
+
+ bkc.deleteLedger(lh.ledgerId);
+ }
+
private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
index aaa7645..95dec9c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java
@@ -50,9 +50,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
- result(wh.appendAsync(DATA));
+ result(wh.appendAsync(DATA.retainedDuplicate()));
}
- long lastEntryID = result(wh.appendAsync(DATA));
+ long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
@@ -69,9 +69,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
- result(wh.appendAsync(DATA));
+ result(wh.appendAsync(DATA.retainedDuplicate()));
}
- long lastEntryID = result(wh.appendAsync(DATA));
+ long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
@@ -90,20 +90,20 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.makeAdv()
.execute())) {
- CompletableFuture<Long> w0 = wh.writeAsync(0, DATA);
- CompletableFuture<Long> w2 = wh.writeAsync(2, DATA);
- CompletableFuture<Long> w3 = wh.writeAsync(3, DATA);
+ CompletableFuture<Long> w0 = wh.writeAsync(0, DATA.retainedDuplicate());
+ CompletableFuture<Long> w2 = wh.writeAsync(2, DATA.retainedDuplicate());
+ CompletableFuture<Long> w3 = wh.writeAsync(3, DATA.retainedDuplicate());
result(w0);
result(wh.force());
assertEquals(0, wh.getLastAddConfirmed());
- CompletableFuture<Long> w1 = wh.writeAsync(1, DATA);
+ CompletableFuture<Long> w1 = wh.writeAsync(1, DATA.retainedDuplicate());
result(w3);
assertTrue(w1.isDone());
assertTrue(w2.isDone());
- CompletableFuture<Long> w5 = wh.writeAsync(5, DATA);
+ CompletableFuture<Long> w5 = wh.writeAsync(5, DATA.retainedDuplicate());
result(wh.force());
assertEquals(3, wh.getLastAddConfirmed());
- wh.writeAsync(4, DATA);
+ wh.writeAsync(4, DATA.retainedDuplicate());
result(w5);
result(wh.force());
assertEquals(5, wh.getLastAddConfirmed());
@@ -120,9 +120,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
- result(wh.appendAsync(DATA));
+ result(wh.appendAsync(DATA.retainedDuplicate()));
}
- long lastEntryID = result(wh.appendAsync(DATA));
+ long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
@@ -131,7 +131,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
killBookie(bookieAddress);
// write should succeed (we still have 2 bookies out of 3)
- result(wh.appendAsync(DATA));
+ result(wh.appendAsync(DATA.retainedDuplicate()));
// force cannot go, it must be acknowledged by all of the bookies in the ensamble
try {
@@ -154,9 +154,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
- result(wh.appendAsync(DATA));
+ result(wh.appendAsync(DATA.retainedDuplicate()));
}
- long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA));
+ long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
assertEquals(-1, wh.getLastAddConfirmed());
@@ -170,7 +170,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
assertEquals(-1, wh.getLastAddConfirmed());
// send an entry and receive ack
- long lastEntry = wh.append(DATA);
+ long lastEntry = wh.append(DATA.retainedDuplicate());
// receive the ack for forceLedger
resumeBookieWriteAcks(bookieAddress);
@@ -195,7 +195,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
- wh.append(DATA);
+ wh.append(DATA.retainedDuplicate());
}
assertEquals(1, availableBookies.size());
@@ -207,7 +207,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase {
try {
// we cannot switch to the new bookie with DEFERRED_SYNC
- wh.append(DATA);
+ wh.append(DATA.retainedDuplicate());
fail("since ensemble change is disable we cannot be able to write any more");
} catch (BKException.BKWriteException ex) {
// expected
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 98c88f7..733593a 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -165,7 +165,7 @@ class EnvelopedEntryWriter implements Writer {
if (null == finalizedBuffer) {
finalizedBuffer = finalizeBuffer();
}
- return finalizedBuffer.slice();
+ return finalizedBuffer.retainedSlice();
}
private ByteBuf finalizeBuffer() {