You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/05/04 06:18:29 UTC

[bookkeeper] branch master updated: BP-47 (task4): Aligned native buffer wrapper (#3253)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c661dfd6b BP-47 (task4): Aligned native buffer wrapper (#3253)
8c661dfd6b is described below

commit 8c661dfd6b118d278222469eb5426ebd7728bdd2
Author: Hang Chen <ch...@apache.org>
AuthorDate: Wed May 4 14:18:23 2022 +0800

    BP-47 (task4): Aligned native buffer wrapper (#3253)
---
 .../common/util/ExceptionMessageHelper.java        |  49 ++++
 bookkeeper-server/build.gradle                     |   1 +
 bookkeeper-server/pom.xml                          |   5 +
 .../bookie/storage/directentrylogger/Buffer.java   | 269 +++++++++++++++++++++
 .../storage/directentrylogger/BufferPool.java      |  68 ++++++
 .../storage/directentrylogger/package-info.java    |  28 +--
 .../storage/directentrylogger/BufferTest.java      | 213 ++++++++++++++++
 build.gradle                                       |   1 +
 native-io/build.gradle                             |   1 -
 native-io/pom.xml                                  |   4 -
 .../common/util/nativeio/NativeUtils.java          |  10 +-
 settings.gradle                                    |   2 +
 12 files changed, 619 insertions(+), 32 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java
new file mode 100644
index 0000000000..9fa925d803
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+/**
+ * Utility to make it easier to add context to exception messages.
+ */
+public class ExceptionMessageHelper {
+    public StringBuilder sb = new StringBuilder();
+    private boolean firstKV = true;
+
+    public static ExceptionMessageHelper exMsg(String msg) {
+        return new ExceptionMessageHelper(msg);
+    }
+
+    ExceptionMessageHelper(String msg) {
+        sb.append(msg).append("(");
+    }
+
+    public ExceptionMessageHelper kv(String key, Object value) {
+        if (firstKV) {
+            firstKV = false;
+        } else {
+            sb.append(",");
+        }
+        sb.append(key).append("=").append(value.toString());
+        return this;
+    }
+
+    public String toString() {
+        return sb.append(")").toString();
+    }
+}
diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle
index 18a43d2d5f..e759a495ca 100644
--- a/bookkeeper-server/build.gradle
+++ b/bookkeeper-server/build.gradle
@@ -28,6 +28,7 @@ dependencies {
     implementation project(':bookkeeper-http:http-server')
     implementation project(':bookkeeper-proto')
     implementation project(':bookkeeper-tools-framework')
+    implementation project(':native-io')
     implementation project(':circe-checksum')
     implementation project(':cpu-affinity')
     implementation project(':stats:bookkeeper-stats-api')
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index bc7886d7bf..b74b9bb71b 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -45,6 +45,11 @@
       <artifactId>bookkeeper-tools-framework</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>native-io</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
new file mode 100644
index 0000000000..0555ce007e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+
+/**
+ * A utility buffer class to be used with native calls.
+ * <p/>
+ * Buffers are page aligned (4k pages).
+ * <p/>
+ * The wrapper mostly handles writes between ByteBuffers and
+ * ByteBufs. It also provides a method for padding the buffer to the next
+ * alignment, so writes can have an aligned size also (as required by
+ * direct I/O). The padding is done with 0xF0, so that if it is read as
+ * an integer, or long, the value will be negative (assuming the read is
+ * a java read, and thus a signed int).
+ */
+class Buffer {
+    /* Padding byte must have MSB set, so if read at the start
+     * of an integer or long, the returned value is negative. */
+    public static final byte PADDING_BYTE = (byte) 0xF0;
+
+    /* Some machines can live with 512 alignment, but others
+     * appear to require 4096, so go with 4096, which is page
+     * alignment */
+    public static final int ALIGNMENT = 4096;
+    private static final int MAX_ALIGNMENT = Integer.MAX_VALUE & ~(ALIGNMENT - 1);
+    static final byte[] PADDING = generatePadding();
+
+    final NativeIO nativeIO;
+    final int bufferSize;
+    ByteBuf buffer;
+    ByteBuffer byteBuffer;
+    long pointer = 0;
+
+    Buffer(NativeIO nativeIO, int bufferSize) throws IOException {
+        checkArgument(isAligned(bufferSize),
+                      "Buffer size not aligned %d", bufferSize);
+
+        this.buffer = allocateAligned(ALIGNMENT, bufferSize);
+        this.nativeIO = nativeIO;
+        this.bufferSize = bufferSize;
+        byteBuffer = buffer.nioBuffer(0, bufferSize);
+        byteBuffer.order(ByteOrder.BIG_ENDIAN);
+    }
+
+    private ByteBuf allocateAligned(int alignment, int bufferSize) {
+        ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment);
+        long addr = buf.memoryAddress();
+        if ((addr & (alignment - 1)) == 0) {
+            // The address is already aligned
+            pointer = addr;
+            return buf.slice(0, bufferSize);
+        } else {
+            int alignOffset = (int) (alignment - (addr & (alignment - 1)));
+            pointer = addr + alignOffset;
+            return buf.slice(alignOffset, bufferSize);
+        }
+    }
+
+    /**
+     * @return whether there is space in the buffer for size bytes.
+     */
+    boolean hasSpace(int size) throws IOException {
+        if (size > bufferSize) {
+            throw new IOException(exMsg("Write too large").kv("writeSize", size)
+                                  .kv("maxSize", bufferSize).toString());
+        }
+        return byteBuffer.remaining() >= size;
+    }
+
+    /**
+     * @return whether the buffer can honour a read of size at offset.
+     */
+    boolean hasData(int offset, int size) {
+        return offset + size <= bufferSize;
+    }
+
+    /**
+     * Write an integer to buffer. Progresses the position of the buffer by 4 bytes.
+     */
+    void writeInt(int value) throws IOException {
+        byteBuffer.putInt(value);
+    }
+
+    /**
+     * Write a btebuf to this buffer. Progresses the position of the buffer by the
+     * number of readable bytes of the bytebuf. Progresses the readerIndex of the passed
+     * bytebuf by the number of bytes read (i.e. to the end).
+     */
+    void writeByteBuf(ByteBuf bytebuf) throws IOException {
+        int bytesWritten = bytebuf.readableBytes();
+        ByteBuffer bytesToPut = bytebuf.nioBuffer();
+        byteBuffer.put(bytesToPut);
+        bytebuf.skipBytes(bytesWritten);
+    }
+
+    /**
+     * Read an integer from the buffer at the given offset. The offset is in bytes.
+     */
+    int readInt(int offset) throws IOException {
+        if (!hasData(offset, Integer.BYTES)) {
+            throw new IOException(exMsg("Buffer cannot satify int read")
+                                  .kv("offset", offset)
+                                  .kv("bufferSize", bufferSize).toString());
+        }
+        try {
+            return byteBuffer.getInt(offset);
+        } catch (Exception e) {
+            throw new IOException(exMsg("Error reading int")
+                                  .kv("byteBuffer", byteBuffer.toString())
+                                  .kv("offset", offset)
+                                  .kv("bufferSize", bufferSize).toString(), e);
+        }
+    }
+
+    /**
+     * Read a long from the buffer at the given offset. The offset is in bytes.
+     */
+    long readLong(int offset) throws IOException {
+        if (!hasData(offset, Long.BYTES)) {
+            throw new IOException(exMsg("Buffer cannot satify long read")
+                                  .kv("offset", offset)
+                                  .kv("bufferSize", bufferSize).toString());
+        }
+        try {
+            return byteBuffer.getLong(offset);
+        } catch (Exception e) {
+            throw new IOException(exMsg("Error reading long")
+                                  .kv("byteBuffer", byteBuffer.toString())
+                                  .kv("offset", offset)
+                                  .kv("bufferSize", bufferSize).toString(), e);
+        }
+    }
+
+    /**
+     * Read a bytebuf of size from the buffer at the given offset.
+     * If there are not enough bytes in the buffer to satify the read, some of the bytes are read
+     * into the byte buffer and the number of bytes read is returned.
+     */
+    int readByteBuf(ByteBuf buffer, int offset, int size) throws IOException {
+        int originalLimit = byteBuffer.limit();
+        byteBuffer.position(offset);
+        int bytesToRead = Math.min(size, byteBuffer.capacity() - offset);
+        byteBuffer.limit(offset + bytesToRead);
+        try {
+            buffer.writeBytes(byteBuffer);
+        } catch (Exception e) {
+            throw new IOException(exMsg("Error reading buffer")
+                                  .kv("byteBuffer", byteBuffer.toString())
+                                  .kv("offset", offset).kv("size", size)
+                                  .kv("bufferSize", bufferSize).toString(), e);
+        } finally {
+            byteBuffer.limit(originalLimit);
+        }
+        return bytesToRead;
+    }
+
+    /**
+     * The data pointer object for the native buffer. This can be used
+     * by JNI method which take a char* or void*.
+     */
+    long pointer() {
+        return pointer;
+    }
+
+    long pointer(long offset, long expectedWrite) {
+        if (offset == 0) {
+            return pointer;
+        } else {
+            if (offset + expectedWrite > byteBuffer.capacity()) {
+                throw new IllegalArgumentException(
+                        exMsg("Buffer overflow").kv("offset", offset).kv("expectedWrite", expectedWrite)
+                        .kv("capacity", byteBuffer.capacity()).toString());
+            }
+
+            return pointer + offset;
+        }
+    }
+    /**
+     * @return the number of bytes which have been written to this buffer.
+     */
+    int position() {
+        return byteBuffer.position();
+    }
+
+    /**
+     * @return the size of the buffer (i.e. the max number of bytes writable, or the max offset readable)
+     */
+    int size() {
+        return bufferSize;
+    }
+
+    /**
+     * Pad the buffer to the next alignment position.
+     * @return the position of the next alignment. This should be used as the size argument to make aligned writes.
+     */
+    int padToAlignment() {
+        int bufferPos = byteBuffer.position();
+        int nextAlignment = nextAlignment(bufferPos);
+        byteBuffer.put(PADDING, 0, nextAlignment - bufferPos);
+        return nextAlignment;
+    }
+
+    /**
+     * Clear the bytes written. This doesn't actually destroy the data, but moves the position back to the start of
+     * the buffer.
+     */
+    void reset() {
+        byteBuffer.clear();
+    }
+
+    /**
+     * Free the memory that backs this buffer.
+     */
+    void free() {
+        ReferenceCountUtil.safeRelease(buffer);
+        buffer = null;
+        byteBuffer = null;
+    }
+    private static byte[] generatePadding() {
+        byte[] padding = new byte[ALIGNMENT];
+        Arrays.fill(padding, (byte) PADDING_BYTE);
+        return padding;
+    }
+
+    static boolean isAligned(long size) {
+        return size >= 0 && ((ALIGNMENT - 1) & size) == 0;
+    }
+
+    static int nextAlignment(int pos) {
+        checkArgument(pos <= MAX_ALIGNMENT,
+                      "position (0x%x) must be lower or equal to max alignment (0x%x)",
+                       pos, MAX_ALIGNMENT);
+        checkArgument(pos >= 0, "position (0x%x) must be positive", pos);
+        return (pos + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
new file mode 100644
index 0000000000..95031b700c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+
+/**
+ * BufferPool used to manage Buffers.
+ */
+public class BufferPool implements AutoCloseable {
+    private final ArrayBlockingQueue<Buffer> pool;
+
+    BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException {
+        pool = new ArrayBlockingQueue<>(maxPoolSize);
+        for (int i = 0; i < maxPoolSize; i++) {
+            pool.add(new Buffer(nativeIO, bufferSize));
+        }
+    }
+
+    Buffer acquire() throws IOException {
+        try {
+            return pool.take();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException(ie);
+        }
+    }
+
+    void release(Buffer buffer) {
+        buffer.reset();
+        if (!pool.add(buffer)) {
+            buffer.free();
+        }
+    }
+
+    @Override
+    public void close() {
+        while (true) {
+            Buffer b = pool.poll();
+            if (b == null) {
+                break;
+            }
+
+            b.free();
+        }
+    }
+}
diff --git a/native-io/build.gradle b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
similarity index 59%
copy from native-io/build.gradle
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
index b680940c80..a714867782 100644
--- a/native-io/build.gradle
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
@@ -16,27 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-plugins {
-    id 'java'
-}
-
-dependencies {
-    compileOnly depLibs.lombok
-    compileOnly depLibs.spotbugsAnnotations
-    implementation depLibs.commonsLang3
-    implementation depLibs.guava
-    implementation depLibs.slf4j
-    testImplementation depLibs.junit
-
-    annotationProcessor depLibs.lombok
-}
-
-compileJava {
-    options.headerOutputDirectory = file("${buildDir}/javahGenerated")
-}
-
-jar {
-    from (tasks.getByPath(":native-io:src:main:native-io-jni:linkRelease").outputs.files.filter { f -> f.isFile()} ) {
-        into "/lib"
-    }
-}
+/**
+ * Support for bookie entry logs using Direct IO.
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java
new file mode 100644
index 0000000000..2a48157523
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl;
+import org.junit.Test;
+
+/**
+ * TestBuffer.
+ */
+public class BufferTest {
+
+    @Test
+    public void testIsAligned() throws Exception {
+        assertFalse(Buffer.isAligned(1234));
+        assertTrue(Buffer.isAligned(4096));
+        assertTrue(Buffer.isAligned(40960));
+        assertTrue(Buffer.isAligned(1 << 20));
+        assertFalse(Buffer.isAligned(-1));
+        assertFalse(Buffer.isAligned(Integer.MAX_VALUE));
+        assertFalse(Buffer.isAligned(Integer.MIN_VALUE));
+    }
+
+    @Test
+    public void testNextAlignment() throws Exception {
+        assertEquals(0, Buffer.nextAlignment(0));
+        assertEquals(4096, Buffer.nextAlignment(1));
+        assertEquals(4096, Buffer.nextAlignment(4096));
+        assertEquals(8192, Buffer.nextAlignment(4097));
+        assertEquals(0x7FFFF000, Buffer.nextAlignment(0x7FFFF000));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativePosition() throws Exception {
+        Buffer.nextAlignment(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMaxAlignment() throws Exception {
+        Buffer.nextAlignment(Integer.MAX_VALUE);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCreateUnaligned() throws Exception {
+        new Buffer(new NativeIOImpl(), 1234);
+    }
+
+    @Test
+    public void testWriteInt() throws Exception {
+        int bufferSize = 1 << 20;
+        Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+        assertTrue(b.hasSpace(bufferSize));
+        assertEquals(0, b.position());
+        b.writeInt(0xdeadbeef);
+
+
+        assertEquals((byte) 0xde, PlatformDependent.getByte(b.pointer() + 0));
+        assertEquals((byte) 0xad, PlatformDependent.getByte(b.pointer() + 1));
+        assertEquals((byte) 0xbe, PlatformDependent.getByte(b.pointer() + 2));
+        assertEquals((byte) 0xef, PlatformDependent.getByte(b.pointer() + 3));
+
+        assertFalse(b.hasSpace(bufferSize));
+        assertEquals(Integer.BYTES, b.position());
+
+        for (int i = 0; i < 10000; i++) {
+            b.writeInt(i);
+        }
+        assertEquals(Integer.BYTES * 10001, b.position());
+        assertTrue(b.hasSpace(bufferSize - (Integer.BYTES * 10001)));
+        assertFalse(b.hasSpace(bufferSize - (Integer.BYTES * 10000)));
+
+        assertEquals(0xdeadbeef, b.readInt(0));
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, b.readInt((i + 1) * Integer.BYTES));
+        }
+        b.reset();
+        assertTrue(b.hasSpace(bufferSize));
+        assertEquals(0, b.position());
+    }
+
+    @Test
+    public void testWriteBuffer() throws Exception {
+        ByteBuf bb = Unpooled.buffer(1021);
+        fillByteBuf(bb, 0xdeadbeef);
+        int bufferSize = 1 << 20;
+        Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+        assertEquals(0, b.position());
+        b.writeByteBuf(bb);
+        assertEquals(1021, b.position());
+        assertEquals(0, bb.readableBytes());
+        bb.clear();
+        fillByteBuf(bb, 0xcafecafe);
+        b.writeByteBuf(bb);
+        assertEquals(0, bb.readableBytes());
+        assertEquals(2042, b.position());
+
+        bb = Unpooled.buffer(2042);
+        int ret = b.readByteBuf(bb, 0, 2042);
+        assertEquals(2042, ret);
+        for (int i = 0; i < 1020 / Integer.BYTES; i++) {
+            assertEquals(0xdeadbeef, bb.readInt());
+        }
+        assertEquals((byte) 0xde, bb.readByte());
+        for (int i = 0; i < 1020 / Integer.BYTES; i++) {
+            assertEquals(0xcafecafe, bb.readInt());
+        }
+    }
+
+    @Test
+    public void testPartialRead() throws Exception {
+        ByteBuf bb = Unpooled.buffer(5000);
+
+        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+        for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+            b.writeInt(0xdeadbeef);
+        }
+
+        int ret = b.readByteBuf(bb, 0, 5000);
+        assertEquals(4096, ret);
+    }
+
+    @Test(expected = IOException.class)
+    public void testReadIntAtBoundary() throws Exception {
+        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+
+        for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+            b.writeInt(0xdeadbeef);
+        }
+        assertTrue(b.hasData(4092, Integer.BYTES));
+        assertFalse(b.hasData(4093, Integer.BYTES));
+        assertFalse(b.hasData(4096, Integer.BYTES));
+
+        b.readInt(4096 - 2);
+    }
+
+    @Test(expected = IOException.class)
+    public void testReadLongAtBoundary() throws Exception {
+        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+
+        for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+            b.writeInt(0xdeadbeef);
+        }
+        assertTrue(b.hasData(4088, Long.BYTES));
+        assertFalse(b.hasData(4089, Long.BYTES));
+        assertFalse(b.hasData(4096, Long.BYTES));
+
+        b.readInt(4096 - 2);
+    }
+
+    @Test
+    public void testPadToAlignment() throws Exception {
+        Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+
+        for (int i = 0; i < 1025; i++) {
+            b.writeInt(0xdededede);
+        }
+        int writtenLength = b.padToAlignment();
+
+        assertEquals(8192, writtenLength);
+        assertEquals(0xdededede, b.readInt(1024 * Integer.BYTES));
+        for (int i = 1025 * Integer.BYTES; i < writtenLength; i += Integer.BYTES) {
+            assertEquals(0xf0f0f0f0, b.readInt(i));
+        }
+        assertEquals(0, b.readInt(writtenLength));
+    }
+
+    @Test
+    public void testFree() throws Exception {
+        Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+        b.free(); // success if process doesn't explode
+        b.free();
+    }
+
+    static void fillByteBuf(ByteBuf bb, int value) {
+        while (bb.writableBytes() >= Integer.BYTES) {
+            bb.writeInt(value);
+        }
+        for (int i = 0; i < Integer.BYTES && bb.writableBytes() > 0; i++) {
+            byte b = (byte) (value >> (Integer.BYTES - i - 1) * 8);
+            bb.writeByte(b);
+        }
+    }
+}
diff --git a/build.gradle b/build.gradle
index 4fb440c447..f6ff2a4719 100644
--- a/build.gradle
+++ b/build.gradle
@@ -85,6 +85,7 @@ allprojects {
     apply from: "$rootDir/dependencies.gradle"
     if (it.path != ':circe-checksum:src:main:circe'
         && it.path != ':cpu-affinity:src:main:affinity'
+        && it.path != ':native-io:src:main:native-io-jni'
         && it.name != 'src'
         && it.name != 'main') {
         apply plugin: 'java'
diff --git a/native-io/build.gradle b/native-io/build.gradle
index b680940c80..3d16ec8b8d 100644
--- a/native-io/build.gradle
+++ b/native-io/build.gradle
@@ -24,7 +24,6 @@ dependencies {
     compileOnly depLibs.lombok
     compileOnly depLibs.spotbugsAnnotations
     implementation depLibs.commonsLang3
-    implementation depLibs.guava
     implementation depLibs.slf4j
     testImplementation depLibs.junit
 
diff --git a/native-io/pom.xml b/native-io/pom.xml
index 8ba378dd0b..693bfd9af8 100644
--- a/native-io/pom.xml
+++ b/native-io/pom.xml
@@ -33,10 +33,6 @@
   </properties>
 
   <dependencies>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
diff --git a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
index d3c83f118f..339dfeda1e 100644
--- a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
+++ b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.bookkeeper.common.util.nativeio;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 import java.io.File;
@@ -29,6 +27,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 
+import lombok.NonNull;
 import lombok.experimental.UtilityClass;
 
 /**
@@ -80,4 +79,9 @@ class NativeUtils {
 
         System.load(temp.getAbsolutePath());
     }
-}
+
+    public static void checkArgument(boolean expression, @NonNull Object errorMessage) {
+        if (!expression) {
+            throw new IllegalArgumentException(String.valueOf(errorMessage));
+        }
+    }}
diff --git a/settings.gradle b/settings.gradle
index a9114e1280..7c40cc85ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -60,6 +60,8 @@ include(':bookkeeper-benchmark',
         'dev:release',
         'microbenchmarks',
         'metadata-drivers:etcd',
+        'native-io',
+        'native-io:src:main:native-io-jni',
         'shaded:distributedlog-core-shaded',
         'stats:bookkeeper-stats-api',
         'stats:bookkeeper-stats-providers:prometheus-metrics-provider',