You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/15 22:32:01 UTC
bookkeeper git commit: BOOKKEEPER-1048: Use ByteBuf in LedgerStorage
interface
Repository: bookkeeper
Updated Branches:
refs/heads/master 5d43260e8 -> 0f81461d2
BOOKKEEPER-1048: Use ByteBuf in LedgerStorage interface
To pass ref-counted buffer from Netty directly to the storage and the Journal, we need to have LedgerStorage to accept ByteBuf instead of ByteBuffer
#### Note
This commit is on top of BOOKKEEPER-1048 / #138. Once that gets merged, I will rebase. Posting now to get Jenkins run. Please review last commit f53f772f79d0a334edc0f05e66edb7cc645b1ffa in this PR for now.
Author: Matteo Merli <mm...@apache.org>
Reviewers: Jia Zhai <None>, Sijie Guo <None>
Closes #139 from merlimat/bytebuf-in-ledger-storage
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0f81461d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0f81461d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0f81461d
Branch: refs/heads/master
Commit: 0f81461d2d1dc5cf9db4de9a46599d7d64e3dac6
Parents: 5d43260
Author: Matteo Merli <mm...@apache.org>
Authored: Mon May 15 15:31:56 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Mon May 15 15:31:56 2017 -0700
----------------------------------------------------------------------
.../org/apache/bookkeeper/bookie/Bookie.java | 61 +++++++++++---------
.../apache/bookkeeper/bookie/EntryKeyValue.java | 7 ++-
.../apache/bookkeeper/bookie/EntryMemTable.java | 2 +-
.../org/apache/bookkeeper/bookie/FileInfo.java | 21 ++++---
.../bookkeeper/bookie/IndexPersistenceMgr.java | 6 +-
.../bookie/InterleavedLedgerStorage.java | 33 ++++++-----
.../org/apache/bookkeeper/bookie/Journal.java | 33 +++++++----
.../apache/bookkeeper/bookie/LedgerCache.java | 6 +-
.../bookkeeper/bookie/LedgerCacheImpl.java | 6 +-
.../bookkeeper/bookie/LedgerDescriptor.java | 15 ++---
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 17 +++---
.../bookie/LedgerDescriptorReadOnlyImpl.java | 5 +-
.../apache/bookkeeper/bookie/LedgerStorage.java | 12 ++--
.../bookkeeper/bookie/SortedLedgerStorage.java | 21 ++++---
.../bookkeeper/proto/BookieProtoEncoding.java | 35 +++++------
.../apache/bookkeeper/proto/BookieProtocol.java | 9 +--
.../bookkeeper/proto/ReadEntryProcessor.java | 9 ++-
.../bookkeeper/proto/ReadEntryProcessorV3.java | 11 ++--
.../bookkeeper/proto/ReadLacProcessorV3.java | 15 +++--
.../bookkeeper/proto/ResponseBuilder.java | 8 +--
.../bookkeeper/proto/WriteEntryProcessor.java | 6 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 5 +-
.../bookkeeper/proto/WriteLacProcessorV3.java | 3 +-
.../bookkeeper/bookie/BookieJournalTest.java | 15 ++---
.../bookkeeper/bookie/CompactionTest.java | 18 +++---
.../apache/bookkeeper/bookie/EntryLogTest.java | 42 +++++++-------
.../bookkeeper/bookie/LedgerCacheTest.java | 20 ++++---
.../bookkeeper/bookie/TestSyncThread.java | 17 +++---
.../bookkeeper/client/BookKeeperCloseTest.java | 9 ++-
.../bookkeeper/client/LedgerCloseTest.java | 8 ++-
.../bookkeeper/client/LedgerRecoveryTest.java | 10 ++--
.../apache/bookkeeper/meta/GcLedgersTest.java | 12 ++--
.../bookkeeper/meta/LedgerManagerTestCase.java | 12 ++--
.../proto/TestPerChannelBookieClient.java | 2 +-
.../replication/AuditorPeriodicCheckTest.java | 4 +-
.../bookkeeper/test/ConcurrentLedgerTest.java | 15 +++--
36 files changed, 292 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index c743ef4..090574c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -22,6 +22,8 @@
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.FileNotFoundException;
@@ -49,8 +51,10 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
+
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -68,6 +72,7 @@ import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.io.FileUtils;
+
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -75,6 +80,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -484,7 +490,7 @@ public class Bookie extends BookieCriticalThread {
Versioned<Cookie> zkCookie = null;
try {
zkCookie = Cookie.readFromZooKeeper(zk, conf);
- // If allowStorageExpansion option is set, we should
+ // If allowStorageExpansion option is set, we should
// make sure that the new set of ledger/index dirs
// is a super set of the old; else, we fail the cookie check
masterCookie.verifyIsSuperSet(zkCookie.getValue());
@@ -773,7 +779,7 @@ public class Bookie extends BookieCriticalThread {
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
recBuff.rewind();
- handle.addEntry(recBuff);
+ handle.addEntry(Unpooled.wrappedBuffer(recBuff));
}
} catch (NoLedgerException nsle) {
LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId);
@@ -1346,9 +1352,10 @@ public class Bookie extends BookieCriticalThread {
*
* @throws BookieException if masterKey does not match the master key of the ledger
*/
- private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, final byte[] masterKey)
+ private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey)
throws IOException, BookieException {
- final long ledgerId = entry.getLong();
+ final long ledgerId = entry.getLong(entry.readerIndex());
+
LedgerDescriptor l = handles.getHandle(ledgerId, masterKey);
if (masterKeyCache.get(ledgerId) == null) {
// Force the load into masterKey cache
@@ -1376,16 +1383,16 @@ public class Bookie extends BookieCriticalThread {
/**
* Add an entry to a ledger as specified by handle.
*/
- private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx)
+ private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallback cb, Object ctx)
throws IOException, BookieException {
long ledgerId = handle.getLedgerId();
- entry.rewind();
long entryId = handle.addEntry(entry);
- entry.rewind();
- writeBytes.add(entry.remaining());
+ writeBytes.add(entry.readableBytes());
- LOG.trace("Adding {}@{}", entryId, ledgerId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding {}@{}", entryId, ledgerId);
+ }
getJournal(ledgerId).logAddEntry(entry, cb, ctx);
}
@@ -1395,7 +1402,7 @@ public class Bookie extends BookieCriticalThread {
* so that they exist on a quorum of bookies. The corresponding client side call for this
* is not exposed to users.
*/
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
@@ -1403,7 +1410,7 @@ public class Bookie extends BookieCriticalThread {
try {
LedgerDescriptor handle = getLedgerForEntry(entry, masterKey);
synchronized (handle) {
- entrySize = entry.remaining();
+ entrySize = entry.readableBytes();
addEntryInternal(handle, entry, cb, ctx);
}
success = true;
@@ -1419,15 +1426,16 @@ public class Bookie extends BookieCriticalThread {
recoveryAddEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
addBytesStats.registerFailedValue(entrySize);
}
+
+ entry.release();
}
}
- public void setExplicitLac(ByteBuffer entry, Object ctx, byte[] masterKey)
+ public void setExplicitLac(ByteBuf entry, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
- long ledgerId = entry.getLong();
+ long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
- entry.rewind();
synchronized (handle) {
handle.setExplicitLac(entry);
}
@@ -1437,8 +1445,8 @@ public class Bookie extends BookieCriticalThread {
}
}
- public ByteBuffer getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
- ByteBuffer lac;
+ public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException {
+ ByteBuf lac;
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
synchronized (handle) {
lac = handle.getExplicitLac();
@@ -1450,7 +1458,7 @@ public class Bookie extends BookieCriticalThread {
* Add entry to a ledger.
* @throws BookieException.LedgerFencedException if the ledger is fenced
*/
- public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
@@ -1462,7 +1470,7 @@ public class Bookie extends BookieCriticalThread {
throw BookieException
.create(BookieException.Code.LedgerFencedException);
}
- entrySize = entry.remaining();
+ entrySize = entry.readableBytes();
addEntryInternal(handle, entry, cb, ctx);
}
success = true;
@@ -1478,6 +1486,8 @@ public class Bookie extends BookieCriticalThread {
addEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
addBytesStats.registerFailedValue(entrySize);
}
+
+ entry.release();
}
}
@@ -1511,7 +1521,7 @@ public class Bookie extends BookieCriticalThread {
}
}
- public ByteBuffer readEntry(long ledgerId, long entryId)
+ public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
@@ -1519,9 +1529,8 @@ public class Bookie extends BookieCriticalThread {
try {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
LOG.trace("Reading {}@{}", entryId, ledgerId);
- ByteBuffer entry = handle.readEntry(entryId);
- entrySize = entry.remaining();
- readBytes.add(entrySize);
+ ByteBuf entry = handle.readEntry(entryId);
+ readBytes.add(entry.readableBytes());
success = true;
return entry;
} finally {
@@ -1667,11 +1676,9 @@ public class Bookie extends BookieCriticalThread {
CounterCallback cb = new CounterCallback();
long start = MathUtils.now();
for (int i = 0; i < 100000; i++) {
- ByteBuffer buff = ByteBuffer.allocate(1024);
- buff.putLong(1);
- buff.putLong(i);
- buff.limit(1024);
- buff.position(0);
+ ByteBuf buff = Unpooled.buffer(1024);
+ buff.writeLong(1);
+ buff.writeLong(i);
cb.incCount();
b.addEntry(buff, cb, null, new byte[0]);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
index dab5396..42a1f34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java
@@ -19,6 +19,9 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.nio.ByteBuffer;
/**
@@ -82,8 +85,8 @@ public class EntryKeyValue extends EntryKey {
*
* @return the value
*/
- public ByteBuffer getValueAsByteBuffer() {
- return ByteBuffer.wrap(getBuffer(), getOffset(), getLength());
+ public ByteBuf getValueAsByteBuffer() {
+ return Unpooled.wrappedBuffer(getBuffer(), getOffset(), getLength());
}
/**
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index ff14d03..214b286 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -251,7 +251,7 @@ public class EntryMemTable {
ledger = kv.getLedgerId();
if (ledgerGC != ledger) {
try {
- flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
+ flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer().nioBuffer());
} catch (NoLedgerException exception) {
ledgerGC = ledger;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index adf148c..90f731a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
/**
* This is the file handle for a ledger's index file that maps entry ids to location.
* It is used by LedgerCache.
@@ -114,31 +117,31 @@ class FileInfo {
return sizeSinceLastwrite;
}
- public ByteBuffer getExplicitLac() {
- ByteBuffer retLac = null;
+ public ByteBuf getExplicitLac() {
+ ByteBuf retLac = null;
synchronized(this) {
LOG.debug("fileInfo:GetLac: {}", explicitLac);
if (explicitLac != null) {
- retLac = ByteBuffer.allocate(explicitLac.capacity());
+ retLac = Unpooled.buffer(explicitLac.capacity());
explicitLac.rewind();//copy from the beginning
- retLac.put(explicitLac);
+ retLac.writeBytes(explicitLac);
explicitLac.rewind();
- retLac.flip();
+ return retLac;
}
}
return retLac;
}
- public void setExplicitLac(ByteBuffer lac) {
+ public void setExplicitLac(ByteBuf lac) {
synchronized(this) {
if (explicitLac == null) {
explicitLac = ByteBuffer.allocate(lac.capacity());
}
- explicitLac.put(lac);
+ lac.readBytes(explicitLac);
explicitLac.rewind();
-
+
// skip the ledger id
- explicitLac.getLong();
+ explicitLac.getLong();
long explicitLacValue = explicitLac.getLong();
setLastAddConfirmed(explicitLacValue);
explicitLac.rewind();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 1ea000c..708bdb3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
@@ -389,7 +391,7 @@ public class IndexPersistenceMgr {
}
}
- void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+ void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
@@ -402,7 +404,7 @@ public class IndexPersistenceMgr {
}
}
- public ByteBuffer getExplicitLac(long ledgerId) {
+ public ByteBuf getExplicitLac(long ledgerId) {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index afd65dc..61a81c4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,6 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -219,11 +222,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
return ledgerCache.isFenced(ledgerId);
}
- public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+ @Override
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
ledgerCache.setExplicitLac(ledgerId, lac);
}
- public ByteBuffer getExplicitLac(long ledgerId) {
+ @Override
+ public ByteBuf getExplicitLac(long ledgerId) {
return ledgerCache.getExplicitLac(ledgerId);
}
@@ -246,13 +251,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
public long getLastAddConfirmed(long ledgerId) throws IOException {
Long lac = ledgerCache.getLastAddConfirmed(ledgerId);
if (lac == null) {
- ByteBuffer bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
+ ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
if (null == bb) {
return BookieProtocol.INVALID_ENTRY_ID;
} else {
- bb.getLong(); // ledger id
- bb.getLong(); // entry id
- lac = bb.getLong();
+ bb.readLong(); // ledger id
+ bb.readLong(); // entry id
+ lac = bb.readLong();
lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
}
}
@@ -260,21 +265,19 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
}
@Override
- synchronized public long addEntry(ByteBuffer entry) throws IOException {
- long ledgerId = entry.getLong();
- long entryId = entry.getLong();
- long lac = entry.getLong();
- entry.rewind();
+ synchronized public long addEntry(ByteBuf entry) throws IOException {
+ long ledgerId = entry.getLong(entry.readerIndex() + 0);
+ long entryId = entry.getLong(entry.readerIndex() + 8);
+ long lac = entry.getLong(entry.readerIndex() + 16);
- processEntry(ledgerId, entryId, entry);
+ processEntry(ledgerId, entryId, entry.nioBuffer());
ledgerCache.updateLastAddConfirmed(ledgerId, lac);
-
return entryId;
}
@Override
- public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
long offset;
/*
* If entryId is BookieProtocol.LAST_ADD_CONFIRMED, then return the last written.
@@ -305,7 +308,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
try {
byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset);
success = true;
- return ByteBuffer.wrap(retBytes);
+ return Unpooled.wrappedBuffer(retBytes);
} finally {
if (success) {
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a39c3fa..c679ee9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -21,6 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
+
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -272,14 +276,14 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
* Journal Entry to Record
*/
private class QueueEntry implements Runnable {
- ByteBuffer entry;
+ ByteBuf entry;
long ledgerId;
long entryId;
WriteCallback cb;
Object ctx;
long enqueueTime;
- QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
+ QueueEntry(ByteBuf entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx, long enqueueTime) {
this.entry = entry.duplicate();
this.cb = cb;
@@ -750,14 +754,20 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
}
}
+ public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
+ logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx);
+ }
+
/**
* record an add entry operation in journal
*/
- public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) {
- long ledgerId = entry.getLong();
- long entryId = entry.getLong();
- entry.rewind();
+ public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) {
+ long ledgerId = entry.getLong(entry.readerIndex() + 0);
+ long entryId = entry.getLong(entry.readerIndex() + 8);
journalQueueSize.inc();
+
+ //Retain entry until it gets written to journal
+ entry.retain();
queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano()));
}
@@ -927,24 +937,25 @@ class Journal extends BookieCriticalThread implements CheckpointSource {
continue;
}
- journalWriteBytes.add(qe.entry.remaining());
+ journalWriteBytes.add(qe.entry.readableBytes());
journalQueueSize.dec();
- batchSize += (4 + qe.entry.remaining());
+ batchSize += (4 + qe.entry.readableBytes());
lenBuff.clear();
- lenBuff.putInt(qe.entry.remaining());
+ lenBuff.putInt(qe.entry.readableBytes());
lenBuff.flip();
// preAlloc based on size
- logFile.preAllocIfNeeded(4 + qe.entry.remaining());
+ logFile.preAllocIfNeeded(4 + qe.entry.readableBytes());
//
// we should be doing the following, but then we run out of
// direct byte buffers
// logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
bc.write(lenBuff);
- bc.write(qe.entry);
+ bc.write(qe.entry.nioBuffer());
+ qe.entry.release();
toFlush.add(qe);
qe = null;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index e004cb6..efb67dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -25,6 +25,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+
/**
* This class maps a ledger entry number into a location (entrylogid, offset) in
* an entry log file. It does user level caching to more efficiently manage disk
@@ -51,6 +53,6 @@ interface LedgerCache extends Closeable {
void deleteLedger(long ledgerId) throws IOException;
LedgerCacheBean getJMXBean();
- void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException;
- ByteBuffer getExplicitLac(long ledgerId);
+ void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException;
+ ByteBuf getExplicitLac(long ledgerId);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index cece79f..515b64b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -31,6 +31,8 @@ import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+
/**
* Implementation of LedgerCache interface.
* This class serves two purposes.
@@ -137,11 +139,11 @@ public class LedgerCacheImpl implements LedgerCache {
return indexPersistenceManager.isFenced(ledgerId);
}
- public void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException {
+ public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
indexPersistenceManager.setExplicitLac(ledgerId, lac);
}
- public ByteBuffer getExplicitLac(long ledgerId) {
+ public ByteBuf getExplicitLac(long ledgerId) {
return indexPersistenceManager.getExplicitLac(ledgerId);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index bcb0c30..9fe1629 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -21,12 +21,9 @@
package org.apache.bookkeeper.bookie;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import io.netty.buffer.ByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
@@ -57,12 +54,12 @@ public abstract class LedgerDescriptor {
abstract boolean setFenced() throws IOException;
abstract boolean isFenced() throws IOException;
- abstract long addEntry(ByteBuffer entry) throws IOException;
- abstract ByteBuffer readEntry(long entryId) throws IOException;
+ abstract long addEntry(ByteBuf entry) throws IOException;
+ abstract ByteBuf readEntry(long entryId) throws IOException;
abstract long getLastAddConfirmed() throws IOException;
- abstract void setExplicitLac(ByteBuffer entry) throws IOException;
+ abstract void setExplicitLac(ByteBuf entry) throws IOException;
- abstract ByteBuffer getExplicitLac();
+ abstract ByteBuf getExplicitLac();
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index bf1c129..6602392 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -21,8 +21,10 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import org.slf4j.Logger;
@@ -69,28 +71,27 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
}
@Override
- void setExplicitLac(ByteBuffer lac) throws IOException {
+ void setExplicitLac(ByteBuf lac) throws IOException {
ledgerStorage.setExplicitlac(ledgerId, lac);
}
@Override
- ByteBuffer getExplicitLac() {
+ ByteBuf getExplicitLac() {
return ledgerStorage.getExplicitLac(ledgerId);
}
- @Override
- long addEntry(ByteBuffer entry) throws IOException {
- long ledgerId = entry.getLong();
+
+ long addEntry(ByteBuf entry) throws IOException {
+ long ledgerId = entry.getLong(entry.readerIndex());
if (ledgerId != this.ledgerId) {
throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
}
- entry.rewind();
return ledgerStorage.addEntry(entry);
}
@Override
- ByteBuffer readEntry(long entryId) throws IOException {
+ ByteBuf readEntry(long entryId) throws IOException {
return ledgerStorage.getEntry(ledgerId, entryId);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
index 29dcfaf..40bf988 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java
@@ -21,8 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
* to write entries to a ledger and read entries from a ledger.
@@ -39,7 +40,7 @@ public class LedgerDescriptorReadOnlyImpl extends LedgerDescriptorImpl {
}
@Override
- long addEntry(ByteBuffer entry) throws IOException {
+ long addEntry(ByteBuf entry) throws IOException {
assert false;
throw new IOException("Invalid action on read only descriptor");
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index fbdd6b9..4587460 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -21,13 +21,13 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
-
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -101,12 +101,12 @@ public interface LedgerStorage {
* Add an entry to the storage.
* @return the entry id of the entry added
*/
- long addEntry(ByteBuffer entry) throws IOException;
+ long addEntry(ByteBuf entry) throws IOException;
/**
* Read an entry from storage
*/
- ByteBuffer getEntry(long ledgerId, long entryId) throws IOException;
+ ByteBuf getEntry(long ledgerId, long entryId) throws IOException;
/**
* Get last add confirmed.
@@ -162,7 +162,7 @@ public interface LedgerStorage {
*/
BKMBeanInfo getJMXBean();
- void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException;
+ void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException;
- ByteBuffer getExplicitLac(long ledgerId);
+ ByteBuf getExplicitLac(long ledgerId);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 698dbd3..105a8b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
@@ -27,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -93,12 +96,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
}
@Override
- public long addEntry(ByteBuffer entry) throws IOException {
- long ledgerId = entry.getLong();
- long entryId = entry.getLong();
- long lac = entry.getLong();
- entry.rewind();
- memTable.addEntry(ledgerId, entryId, entry, this);
+ public long addEntry(ByteBuf entry) throws IOException {
+ long ledgerId = entry.getLong(entry.readerIndex() + 0);
+ long entryId = entry.getLong(entry.readerIndex() + 8);
+ long lac = entry.getLong(entry.readerIndex() + 16);
+
+ memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this);
ledgerCache.updateLastAddConfirmed(ledgerId, lac);
return entryId;
}
@@ -108,7 +111,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
* @param ledgerId
* @return
*/
- private ByteBuffer getLastEntryId(long ledgerId) throws IOException {
+ private ByteBuf getLastEntryId(long ledgerId) throws IOException {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (null != kv) {
return kv.getValueAsByteBuffer();
@@ -118,11 +121,11 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
}
@Override
- public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
return getLastEntryId(ledgerId);
}
- ByteBuffer buffToRet;
+ ByteBuf buffToRet;
try {
buffToRet = super.getEntry(ledgerId, entryId);
} catch (Bookie.NoEntryException nee) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 148b31d..b1f86ae 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -20,17 +20,6 @@
*/
package org.apache.bookkeeper.proto;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.handler.codec.MessageToMessageDecoder;
-import io.netty.handler.codec.MessageToMessageEncoder;
-
import java.io.IOException;
import java.util.List;
@@ -44,6 +33,17 @@ import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
public class BookieProtoEncoding {
private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
@@ -146,10 +146,8 @@ public class BookieProtoEncoding {
packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
// Read ledger and entry id without advancing the reader index
- packet.markReaderIndex();
- ledgerId = packet.readLong();
- entryId = packet.readLong();
- packet.resetReaderIndex();
+ ledgerId = packet.getLong(packet.readerIndex());
+ entryId = packet.getLong(packet.readerIndex() + 8);
return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, flags, masterKey, packet.retain());
case BookieProtocol.READENTRY:
ledgerId = packet.readLong();
@@ -169,6 +167,7 @@ public class BookieProtoEncoding {
builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry);
return new BookieProtocol.AuthRequest(h.getVersion(), builder.build());
}
+
return packet;
}
}
@@ -238,8 +237,9 @@ public class BookieProtoEncoding {
entryId = buffer.readLong();
if (rc == BookieProtocol.EOK) {
+ ByteBuf content = buffer.slice();
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
- ledgerId, entryId, buffer.slice());
+ ledgerId, entryId, content.retain());
} else {
return new BookieProtocol.ReadResponse(header.getVersion(), rc,
ledgerId, entryId);
@@ -327,9 +327,6 @@ public class BookieProtoEncoding {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Encode request {} to channel {}.", msg, ctx.channel());
- }
if (msg instanceof BookkeeperProtocol.Request) {
out.add(REQ_V3.encode(msg, ctx.alloc()));
} else if (msg instanceof BookieProtocol.Request) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 094daab..f0cfa58 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -22,8 +22,7 @@ package org.apache.bookkeeper.proto;
*/
import io.netty.buffer.ByteBuf;
-
-import java.nio.ByteBuffer;
+import io.netty.buffer.Unpooled;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
@@ -262,10 +261,6 @@ public interface BookieProtocol {
return data;
}
- ByteBuffer getDataAsByteBuffer() {
- return data.nioBuffer().slice();
- }
-
boolean isRecoveryAdd() {
return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
}
@@ -351,7 +346,7 @@ public interface BookieProtocol {
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
- this.data = null;
+ this.data = Unpooled.EMPTY_BUFFER;
}
ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index bd98374..02fa1c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -17,10 +17,11 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -49,7 +50,7 @@ class ReadEntryProcessor extends PacketProcessorBase {
LOG.debug("Received new read request: {}", request);
int errorCode = BookieProtocol.EIO;
long startTimeNanos = MathUtils.nowInNano();
- ByteBuffer data = null;
+ ByteBuf data = null;
try {
Future<Boolean> fenceResult = null;
if (read.isFencingRequest()) {
@@ -63,7 +64,7 @@ class ReadEntryProcessor extends PacketProcessorBase {
}
}
data = requestProcessor.bookie.readEntry(request.getLedgerId(), request.getEntryId());
- LOG.debug("##### Read entry ##### {}", data.remaining());
+ LOG.debug("##### Read entry ##### {} -- ref-count: {}", data.readableBytes(), data.refCnt());
if (null != fenceResult) {
// TODO:
// currently we don't have readCallback to run in separated read
@@ -127,6 +128,8 @@ class ReadEntryProcessor extends PacketProcessorBase {
requestProcessor.readRequestStats);
} else {
+ ReferenceCountUtil.release(data);
+
requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, read),
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index fbfa71f..b04f6b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -17,10 +17,11 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
LOG.debug("Received new read request: {}", request);
StatusCode status;
- ByteBuffer entryBody;
+ ByteBuf entryBody = null;
try {
Future<Boolean> fenceResult = null;
if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
@@ -98,7 +99,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
status = StatusCode.EIO;
} else {
status = StatusCode.EOK;
- readResponse.setBody(ByteString.copyFrom(entryBody));
+ readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
}
} catch (InterruptedException ie) {
LOG.error("Interrupting fence read entry (lid: {}, eid: {})",
@@ -114,7 +115,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
status = StatusCode.EIO;
}
} else {
- readResponse.setBody(ByteString.copyFrom(entryBody));
+ readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer()));
status = StatusCode.EOK;
}
} catch (Bookie.NoLedgerException e) {
@@ -142,6 +143,8 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
TimeUnit.NANOSECONDS);
}
+ ReferenceCountUtil.release(entryBody);
+
// Finally set status and return. The body would have been updated if
// a read went through.
readResponse.setStatus(status);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
index 0fbdbec..e9cc1cb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.proto;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
@@ -36,7 +35,9 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import io.netty.util.ReferenceCountUtil;
class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class);
@@ -61,14 +62,14 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
logger.debug("Received ReadLac request: {}", request);
StatusCode status = StatusCode.EOK;
- ByteBuffer lastEntry;
- ByteBuffer lac;
+ ByteBuf lastEntry = null;
+ ByteBuf lac = null;
try {
lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
lac = requestProcessor.bookie.getExplicitLac(ledgerId);
if (lac != null) {
- readLacResponse.setLacBody(ByteString.copyFrom(lac));
- readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry));
+ readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer()));
+ readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer()));
} else {
status = StatusCode.ENOENTRY;
}
@@ -78,7 +79,11 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
} catch (IOException e) {
status = StatusCode.EIO;
logger.error("IOException while performing readLac from ledger: {}", ledgerId);
+ } finally {
+ ReferenceCountUtil.release(lastEntry);
+ ReferenceCountUtil.release(lac);
}
+
if (status == StatusCode.EOK) {
requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
index 1418437..c0be162 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java
@@ -20,9 +20,7 @@
*/
package org.apache.bookkeeper.proto;
-import io.netty.buffer.Unpooled;
-
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
class ResponseBuilder {
static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) {
@@ -41,8 +39,8 @@ class ResponseBuilder {
r.getEntryId());
}
- static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) {
+ static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) {
return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK,
- r.getLedgerId(), r.getEntryId(), Unpooled.wrappedBuffer(data));
+ r.getLedgerId(), r.getEntryId(), data);
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 46f7f7d..827aed9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -62,11 +62,9 @@ class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback {
int rc = BookieProtocol.EOK;
try {
if (add.isRecoveryAdd()) {
- requestProcessor.bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
- this, channel, add.getMasterKey());
+ requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey());
} else {
- requestProcessor.bookie.addEntry(add.getDataAsByteBuffer(),
- this, channel, add.getMasterKey());
+ requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey());
}
} catch (IOException e) {
LOG.error("Error writing " + add, e);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index e34e894..b4e89f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -20,10 +20,11 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieException;
@@ -102,7 +103,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
};
StatusCode status = null;
byte[] masterKey = addRequest.getMasterKey().toByteArray();
- ByteBuffer entryToAdd = addRequest.getBody().asReadOnlyByteBuffer();
+ ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
try {
if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 097a573..e8ffb34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
@@ -69,7 +70,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
byte[] masterKey = writeLacRequest.getMasterKey().toByteArray();
try {
- requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey);
+ requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), channel, masterKey);
status = StatusCode.EOK;
} catch (IOException e) {
logger.error("Error saving lac for ledger:{}",
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 1dfa32e..711d38b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.junit.After;
+
import static org.junit.Assert.*;
public class BookieJournalTest {
@@ -620,15 +621,15 @@ public class BookieJournalTest {
b.readEntry(1, 99);
// still able to read last entry, but it's junk
- ByteBuffer buf = b.readEntry(1, 100);
- assertEquals("Ledger Id is wrong", buf.getLong(), 1);
- assertEquals("Entry Id is wrong", buf.getLong(), 100);
- assertEquals("Last confirmed is wrong", buf.getLong(), 99);
- assertEquals("Length is wrong", buf.getLong(), 100*1024);
- buf.getLong(); // skip checksum
+ ByteBuf buf = b.readEntry(1, 100);
+ assertEquals("Ledger Id is wrong", buf.readLong(), 1);
+ assertEquals("Entry Id is wrong", buf.readLong(), 100);
+ assertEquals("Last confirmed is wrong", buf.readLong(), 99);
+ assertEquals("Length is wrong", buf.readLong(), 100*1024);
+ buf.readLong(); // skip checksum
boolean allX = true;
for (int i = 0; i < 1024; i++) {
- byte x = buf.get();
+ byte x = buf.readByte();
allX = allX && x == (byte)'X';
}
assertFalse("Some of buffer should have been zeroed", allX);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6ae5e60..067b411 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -20,9 +20,11 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,6 +53,7 @@ import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.TestUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.AsyncCallback;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -571,14 +574,13 @@ public class CompactionTest extends BookKeeperClusterTestCase {
storage.gcThread.doCompactEntryLogs(threshold);
}
- private ByteBuffer genEntry(long ledger, long entry, int size) {
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
- bb.putLong(ledger);
- bb.putLong(entry);
- while (bb.hasRemaining()) {
- bb.put((byte)0xFF);
+ private ByteBuf genEntry(long ledger, long entry, int size) {
+ ByteBuf bb = Unpooled.buffer(size);
+ bb.writeLong(ledger);
+ bb.writeLong(entry);
+ while (bb.isWritable()) {
+ bb.writeByte((byte)0xFF);
}
- bb.flip();
return bb;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 4e1004c..7eac1a2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -20,9 +20,12 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -72,9 +75,9 @@ public class EntryLogTest {
Bookie bookie = new Bookie(conf);
// create some entries
EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
- logger.addEntry(1, generateEntry(1, 1));
- logger.addEntry(3, generateEntry(3, 1));
- logger.addEntry(2, generateEntry(2, 1));
+ logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+ logger.addEntry(2, generateEntry(2, 1).nioBuffer());
logger.flush();
// now lets truncate the file to corrupt the last entry, which simulates a partial write
File f = new File(curDir, "0.log");
@@ -91,13 +94,12 @@ public class EntryLogTest {
assertNotNull(meta.getLedgersMap().get(3L));
}
- private ByteBuffer generateEntry(long ledger, long entry) {
+ private ByteBuf generateEntry(long ledger, long entry) {
byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
- ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
- bb.putLong(ledger);
- bb.putLong(entry);
- bb.put(data);
- bb.flip();
+ ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+ bb.writeLong(ledger);
+ bb.writeLong(entry);
+ bb.writeBytes(data);
return bb;
}
@@ -120,7 +122,7 @@ public class EntryLogTest {
EntryLogger logger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
for (int j=0; j<numEntries; j++) {
- positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+ positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
}
logger.flush();
}
@@ -135,7 +137,7 @@ public class EntryLogTest {
EntryLogger logger = new EntryLogger(conf,
bookie.getLedgerDirsManager());
for (int j=0; j<numEntries; j++) {
- positions[i][j] = logger.addEntry(i, generateEntry(i, j));
+ positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
}
logger.flush();
}
@@ -231,10 +233,10 @@ public class EntryLogTest {
// create some entries
EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger;
- logger.addEntry(1, generateEntry(1, 1));
- logger.addEntry(3, generateEntry(3, 1));
- logger.addEntry(2, generateEntry(2, 1));
- logger.addEntry(1, generateEntry(1, 2));
+ logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+ logger.addEntry(2, generateEntry(2, 1).nioBuffer());
+ logger.addEntry(1, generateEntry(1, 2).nioBuffer());
logger.rollLog();
logger.flushRotatedLogs();
@@ -265,10 +267,10 @@ public class EntryLogTest {
// create some entries
EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
- logger.addEntry(1, generateEntry(1, 1));
- logger.addEntry(3, generateEntry(3, 1));
- logger.addEntry(2, generateEntry(2, 1));
- logger.addEntry(1, generateEntry(1, 2));
+ logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+ logger.addEntry(3, generateEntry(3, 1).nioBuffer());
+ logger.addEntry(2, generateEntry(2, 1).nioBuffer());
+ logger.addEntry(1, generateEntry(1, 2).nioBuffer());
logger.rollLog();
// Rewrite the entry log header to be on V0 format
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index b63806e..41ab89c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,6 +21,9 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -330,7 +333,7 @@ public class LedgerCacheTest {
Bookie b = new Bookie(conf);
b.start();
for (int i = 1; i <= numLedgers; i++) {
- ByteBuffer packet = generateEntry(i, 1);
+ ByteBuf packet = generateEntry(i, 1);
b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
}
@@ -511,7 +514,7 @@ public class LedgerCacheTest {
// this bookie.addEntry call is required. FileInfo for Ledger 1 would be created with this call.
// without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail because of BOOKKEEPER-965 change.
bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes());
-
+
flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly());
assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
@@ -535,14 +538,13 @@ public class LedgerCacheTest {
assertTrue("EntryMemTable SnapShot is expected to be empty, because of successful flush",
memTable.snapshot.isEmpty());
}
-
- private ByteBuffer generateEntry(long ledger, long entry) {
+
+ private ByteBuf generateEntry(long ledger, long entry) {
byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
- ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]);
- bb.putLong(ledger);
- bb.putLong(entry);
- bb.put(data);
- bb.flip();
+ ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
+ bb.writeLong(ledger);
+ bb.writeLong(entry);
+ bb.writeBytes(data);
return bb;
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 3672985..9352db3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -20,10 +20,10 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
+
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
@@ -35,17 +35,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
-
import org.apache.bookkeeper.jmx.BKMBeanInfo;
-
+import org.apache.bookkeeper.meta.LedgerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
@@ -314,12 +311,12 @@ public class TestSyncThread {
}
@Override
- public long addEntry(ByteBuffer entry) throws IOException {
+ public long addEntry(ByteBuf entry) throws IOException {
return 1L;
}
@Override
- public ByteBuffer getEntry(long ledgerId, long entryId)
+ public ByteBuf getEntry(long ledgerId, long entryId)
throws IOException {
return null;
}
@@ -334,11 +331,11 @@ public class TestSyncThread {
}
@Override
- public void setExplicitlac(long ledgerId, ByteBuffer lac) {
+ public void setExplicitlac(long ledgerId, ByteBuf lac) {
}
@Override
- public ByteBuffer getExplicitLac(long ledgerId) {
+ public ByteBuf getExplicitLac(long ledgerId) {
return null;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index d781d5d..41a8ecd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -39,6 +39,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Enumeration;
@@ -72,7 +75,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
Bookie delayBookie = new Bookie(conf) {
@Override
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb,
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
@@ -86,7 +89,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
}
@Override
- public void addEntry(ByteBuffer entry, WriteCallback cb,
+ public void addEntry(ByteBuf entry, WriteCallback cb,
Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
@@ -100,7 +103,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
}
@Override
- public ByteBuffer readEntry(long ledgerId, long entryId)
+ public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
try {
Thread.sleep(5000);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index a7bcf77..0c470a2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.bookkeeper.client;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -194,7 +196,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
throws Exception {
Bookie sBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
latch.await();
@@ -204,7 +206,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
}
@Override
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
throw new IOException("Dead bookie for recovery adds.");
}
@@ -218,7 +220,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception {
Bookie dBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
try {
latch.await();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index f54cde1..5f7c56f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.client;
*
*/
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
@@ -186,7 +188,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
Bookie fakeBookie = new Bookie(conf) {
@Override
- public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
}
@@ -245,7 +247,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
ServerConfiguration conf = newServerConfiguration();
Bookie deadBookie1 = new Bookie(conf) {
@Override
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
throw new IOException("Couldn't write for some reason");
@@ -326,7 +328,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
ServerConfiguration conf = newServerConfiguration();
Bookie deadBookie1 = new Bookie(conf) {
@Override
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a slow and failed bookie
throw new IOException("Couldn't write for some reason");
@@ -410,7 +412,7 @@ public class LedgerRecoveryTest extends BaseTestCase {
private void startDeadBookie(ServerConfiguration conf) throws Exception {
Bookie rBookie = new Bookie(conf) {
@Override
- public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
// drop request to simulate a dead bookie
throw new IOException("Couldn't write entries for some reason");
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 280db05..0db938b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,14 +21,16 @@
package org.apache.bookkeeper.meta;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -336,11 +338,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
@Override
- public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
}
@Override
- public ByteBuffer getExplicitLac(long ledgerId) {
+ public ByteBuf getExplicitLac(long ledgerId) {
return null;
}
@@ -369,12 +371,12 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
@Override
- public long addEntry(ByteBuffer entry) throws IOException {
+ public long addEntry(ByteBuf entry) throws IOException {
return 0;
}
@Override
- public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 8d13102..ac9fe59 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,7 +22,6 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@@ -35,7 +34,6 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
-import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -49,6 +47,8 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+
/**
* Test case to run over serveral ledger managers
*/
@@ -153,12 +153,12 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
}
@Override
- public long addEntry(ByteBuffer entry) throws IOException {
+ public long addEntry(ByteBuf entry) throws IOException {
return 0;
}
@Override
- public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
return null;
}
@@ -213,13 +213,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
}
@Override
- public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException {
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
// TODO Auto-generated method stub
}
@Override
- public ByteBuffer getExplicitLac(long ledgerId) {
+ public ByteBuf getExplicitLac(long ledgerId) {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 16bbfd0..e2a4f32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -235,7 +235,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
Bookie delayBookie = new Bookie(conf) {
@Override
- public ByteBuffer readEntry(long ledgerId, long entryId)
+ public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
try {
Thread.sleep(3000);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 428a597..159b859 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -40,6 +40,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
@@ -251,7 +253,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
Bookie deadBookie = new Bookie(conf) {
@Override
- public ByteBuffer readEntry(long ledgerId, long entryId)
+ public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
// we want to disable during checking
numReads.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
index 94319df..7a65388 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
@@ -21,6 +21,9 @@
package org.apache.bookkeeper.test;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -148,12 +151,12 @@ public class ConcurrentLedgerTest {
long start = System.currentTimeMillis();
for(int i = 1; i <= totalwrites/ledgers; i++) {
for(int j = 1; j <= ledgers; j++) {
- ByteBuffer entry = bookie.readEntry(j, i);
+ ByteBuf entry = bookie.readEntry(j, i);
// skip the ledger id and the entry id
- entry.getLong();
- entry.getLong();
- assertEquals(j + "@" + i, j+2, entry.getLong());
- assertEquals(j + "@" + i, i+3, entry.getLong());
+ entry.readLong();
+ entry.readLong();
+ assertEquals(j + "@" + i, j+2, entry.readLong());
+ assertEquals(j + "@" + i, i+3, entry.readLong());
}
}
long finish = System.currentTimeMillis();
@@ -184,7 +187,7 @@ public class ConcurrentLedgerTest {
bytes.position(0);
bytes.limit(bytes.capacity());
throttle.acquire();
- bookie.addEntry(bytes, cb, counter, zeros);
+ bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, zeros);
}
}
long finish = System.currentTimeMillis();