You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/05/04 06:18:29 UTC
[bookkeeper] branch master updated: BP-47 (task4): Aligned native buffer wrapper (#3253)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 8c661dfd6b BP-47 (task4): Aligned native buffer wrapper (#3253)
8c661dfd6b is described below
commit 8c661dfd6b118d278222469eb5426ebd7728bdd2
Author: Hang Chen <ch...@apache.org>
AuthorDate: Wed May 4 14:18:23 2022 +0800
BP-47 (task4): Aligned native buffer wrapper (#3253)
---
.../common/util/ExceptionMessageHelper.java | 49 ++++
bookkeeper-server/build.gradle | 1 +
bookkeeper-server/pom.xml | 5 +
.../bookie/storage/directentrylogger/Buffer.java | 269 +++++++++++++++++++++
.../storage/directentrylogger/BufferPool.java | 68 ++++++
.../storage/directentrylogger/package-info.java | 28 +--
.../storage/directentrylogger/BufferTest.java | 213 ++++++++++++++++
build.gradle | 1 +
native-io/build.gradle | 1 -
native-io/pom.xml | 4 -
.../common/util/nativeio/NativeUtils.java | 10 +-
settings.gradle | 2 +
12 files changed, 619 insertions(+), 32 deletions(-)
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java
new file mode 100644
index 0000000000..9fa925d803
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExceptionMessageHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+/**
+ * Utility to make it easier to add context to exception messages.
+ */
+public class ExceptionMessageHelper {
+ public StringBuilder sb = new StringBuilder();
+ private boolean firstKV = true;
+
+ public static ExceptionMessageHelper exMsg(String msg) {
+ return new ExceptionMessageHelper(msg);
+ }
+
+ ExceptionMessageHelper(String msg) {
+ sb.append(msg).append("(");
+ }
+
+ public ExceptionMessageHelper kv(String key, Object value) {
+ if (firstKV) {
+ firstKV = false;
+ } else {
+ sb.append(",");
+ }
+ sb.append(key).append("=").append(value.toString());
+ return this;
+ }
+
+ public String toString() {
+ return sb.append(")").toString();
+ }
+}
diff --git a/bookkeeper-server/build.gradle b/bookkeeper-server/build.gradle
index 18a43d2d5f..e759a495ca 100644
--- a/bookkeeper-server/build.gradle
+++ b/bookkeeper-server/build.gradle
@@ -28,6 +28,7 @@ dependencies {
implementation project(':bookkeeper-http:http-server')
implementation project(':bookkeeper-proto')
implementation project(':bookkeeper-tools-framework')
+ implementation project(':native-io')
implementation project(':circe-checksum')
implementation project(':cpu-affinity')
implementation project(':stats:bookkeeper-stats-api')
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index bc7886d7bf..b74b9bb71b 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -45,6 +45,11 @@
<artifactId>bookkeeper-tools-framework</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>native-io</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
new file mode 100644
index 0000000000..0555ce007e
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+
+/**
+ * A utility buffer class to be used with native calls.
+ * <p/>
+ * Buffers are page aligned (4k pages).
+ * <p/>
+ * The wrapper mostly handles writes between ByteBuffers and
+ * ByteBufs. It also provides a method for padding the buffer to the next
+ * alignment, so writes can have an aligned size also (as required by
+ * direct I/O). The padding is done with 0xF0, so that if it is read as
+ * an integer, or long, the value will be negative (assuming the read is
+ * a java read, and thus a signed int).
+ */
+class Buffer {
+ /* Padding byte must have MSB set, so if read at the start
+ * of an integer or long, the returned value is negative. */
+ public static final byte PADDING_BYTE = (byte) 0xF0;
+
+ /* Some machines can live with 512 alignment, but others
+ * appear to require 4096, so go with 4096, which is page
+ * alignment */
+ public static final int ALIGNMENT = 4096;
+ private static final int MAX_ALIGNMENT = Integer.MAX_VALUE & ~(ALIGNMENT - 1);
+ static final byte[] PADDING = generatePadding();
+
+ final NativeIO nativeIO;
+ final int bufferSize;
+ ByteBuf buffer;
+ ByteBuffer byteBuffer;
+ long pointer = 0;
+
+ Buffer(NativeIO nativeIO, int bufferSize) throws IOException {
+ checkArgument(isAligned(bufferSize),
+ "Buffer size not aligned %d", bufferSize);
+
+ this.buffer = allocateAligned(ALIGNMENT, bufferSize);
+ this.nativeIO = nativeIO;
+ this.bufferSize = bufferSize;
+ byteBuffer = buffer.nioBuffer(0, bufferSize);
+ byteBuffer.order(ByteOrder.BIG_ENDIAN);
+ }
+
+ private ByteBuf allocateAligned(int alignment, int bufferSize) {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment);
+ long addr = buf.memoryAddress();
+ if ((addr & (alignment - 1)) == 0) {
+ // The address is already aligned
+ pointer = addr;
+ return buf.slice(0, bufferSize);
+ } else {
+ int alignOffset = (int) (alignment - (addr & (alignment - 1)));
+ pointer = addr + alignOffset;
+ return buf.slice(alignOffset, bufferSize);
+ }
+ }
+
+ /**
+ * @return whether there is space in the buffer for size bytes.
+ */
+ boolean hasSpace(int size) throws IOException {
+ if (size > bufferSize) {
+ throw new IOException(exMsg("Write too large").kv("writeSize", size)
+ .kv("maxSize", bufferSize).toString());
+ }
+ return byteBuffer.remaining() >= size;
+ }
+
+ /**
+ * @return whether the buffer can honour a read of size at offset.
+ */
+ boolean hasData(int offset, int size) {
+ return offset + size <= bufferSize;
+ }
+
+ /**
+ * Write an integer to buffer. Progresses the position of the buffer by 4 bytes.
+ */
+ void writeInt(int value) throws IOException {
+ byteBuffer.putInt(value);
+ }
+
+ /**
+ * Write a btebuf to this buffer. Progresses the position of the buffer by the
+ * number of readable bytes of the bytebuf. Progresses the readerIndex of the passed
+ * bytebuf by the number of bytes read (i.e. to the end).
+ */
+ void writeByteBuf(ByteBuf bytebuf) throws IOException {
+ int bytesWritten = bytebuf.readableBytes();
+ ByteBuffer bytesToPut = bytebuf.nioBuffer();
+ byteBuffer.put(bytesToPut);
+ bytebuf.skipBytes(bytesWritten);
+ }
+
+ /**
+ * Read an integer from the buffer at the given offset. The offset is in bytes.
+ */
+ int readInt(int offset) throws IOException {
+ if (!hasData(offset, Integer.BYTES)) {
+ throw new IOException(exMsg("Buffer cannot satify int read")
+ .kv("offset", offset)
+ .kv("bufferSize", bufferSize).toString());
+ }
+ try {
+ return byteBuffer.getInt(offset);
+ } catch (Exception e) {
+ throw new IOException(exMsg("Error reading int")
+ .kv("byteBuffer", byteBuffer.toString())
+ .kv("offset", offset)
+ .kv("bufferSize", bufferSize).toString(), e);
+ }
+ }
+
+ /**
+ * Read a long from the buffer at the given offset. The offset is in bytes.
+ */
+ long readLong(int offset) throws IOException {
+ if (!hasData(offset, Long.BYTES)) {
+ throw new IOException(exMsg("Buffer cannot satify long read")
+ .kv("offset", offset)
+ .kv("bufferSize", bufferSize).toString());
+ }
+ try {
+ return byteBuffer.getLong(offset);
+ } catch (Exception e) {
+ throw new IOException(exMsg("Error reading long")
+ .kv("byteBuffer", byteBuffer.toString())
+ .kv("offset", offset)
+ .kv("bufferSize", bufferSize).toString(), e);
+ }
+ }
+
+ /**
+ * Read a bytebuf of size from the buffer at the given offset.
+ * If there are not enough bytes in the buffer to satify the read, some of the bytes are read
+ * into the byte buffer and the number of bytes read is returned.
+ */
+ int readByteBuf(ByteBuf buffer, int offset, int size) throws IOException {
+ int originalLimit = byteBuffer.limit();
+ byteBuffer.position(offset);
+ int bytesToRead = Math.min(size, byteBuffer.capacity() - offset);
+ byteBuffer.limit(offset + bytesToRead);
+ try {
+ buffer.writeBytes(byteBuffer);
+ } catch (Exception e) {
+ throw new IOException(exMsg("Error reading buffer")
+ .kv("byteBuffer", byteBuffer.toString())
+ .kv("offset", offset).kv("size", size)
+ .kv("bufferSize", bufferSize).toString(), e);
+ } finally {
+ byteBuffer.limit(originalLimit);
+ }
+ return bytesToRead;
+ }
+
+ /**
+ * The data pointer object for the native buffer. This can be used
+ * by JNI method which take a char* or void*.
+ */
+ long pointer() {
+ return pointer;
+ }
+
+ long pointer(long offset, long expectedWrite) {
+ if (offset == 0) {
+ return pointer;
+ } else {
+ if (offset + expectedWrite > byteBuffer.capacity()) {
+ throw new IllegalArgumentException(
+ exMsg("Buffer overflow").kv("offset", offset).kv("expectedWrite", expectedWrite)
+ .kv("capacity", byteBuffer.capacity()).toString());
+ }
+
+ return pointer + offset;
+ }
+ }
+ /**
+ * @return the number of bytes which have been written to this buffer.
+ */
+ int position() {
+ return byteBuffer.position();
+ }
+
+ /**
+ * @return the size of the buffer (i.e. the max number of bytes writable, or the max offset readable)
+ */
+ int size() {
+ return bufferSize;
+ }
+
+ /**
+ * Pad the buffer to the next alignment position.
+ * @return the position of the next alignment. This should be used as the size argument to make aligned writes.
+ */
+ int padToAlignment() {
+ int bufferPos = byteBuffer.position();
+ int nextAlignment = nextAlignment(bufferPos);
+ byteBuffer.put(PADDING, 0, nextAlignment - bufferPos);
+ return nextAlignment;
+ }
+
+ /**
+ * Clear the bytes written. This doesn't actually destroy the data, but moves the position back to the start of
+ * the buffer.
+ */
+ void reset() {
+ byteBuffer.clear();
+ }
+
+ /**
+ * Free the memory that backs this buffer.
+ */
+ void free() {
+ ReferenceCountUtil.safeRelease(buffer);
+ buffer = null;
+ byteBuffer = null;
+ }
+ private static byte[] generatePadding() {
+ byte[] padding = new byte[ALIGNMENT];
+ Arrays.fill(padding, (byte) PADDING_BYTE);
+ return padding;
+ }
+
+ static boolean isAligned(long size) {
+ return size >= 0 && ((ALIGNMENT - 1) & size) == 0;
+ }
+
+ static int nextAlignment(int pos) {
+ checkArgument(pos <= MAX_ALIGNMENT,
+ "position (0x%x) must be lower or equal to max alignment (0x%x)",
+ pos, MAX_ALIGNMENT);
+ checkArgument(pos >= 0, "position (0x%x) must be positive", pos);
+ return (pos + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1);
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
new file mode 100644
index 0000000000..95031b700c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIO;
+
+/**
+ * BufferPool used to manage Buffers.
+ */
+public class BufferPool implements AutoCloseable {
+ private final ArrayBlockingQueue<Buffer> pool;
+
+ BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException {
+ pool = new ArrayBlockingQueue<>(maxPoolSize);
+ for (int i = 0; i < maxPoolSize; i++) {
+ pool.add(new Buffer(nativeIO, bufferSize));
+ }
+ }
+
+ Buffer acquire() throws IOException {
+ try {
+ return pool.take();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException(ie);
+ }
+ }
+
+ void release(Buffer buffer) {
+ buffer.reset();
+ if (!pool.add(buffer)) {
+ buffer.free();
+ }
+ }
+
+ @Override
+ public void close() {
+ while (true) {
+ Buffer b = pool.poll();
+ if (b == null) {
+ break;
+ }
+
+ b.free();
+ }
+ }
+}
diff --git a/native-io/build.gradle b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
similarity index 59%
copy from native-io/build.gradle
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
index b680940c80..a714867782 100644
--- a/native-io/build.gradle
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/package-info.java
@@ -16,27 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-plugins {
- id 'java'
-}
-
-dependencies {
- compileOnly depLibs.lombok
- compileOnly depLibs.spotbugsAnnotations
- implementation depLibs.commonsLang3
- implementation depLibs.guava
- implementation depLibs.slf4j
- testImplementation depLibs.junit
-
- annotationProcessor depLibs.lombok
-}
-
-compileJava {
- options.headerOutputDirectory = file("${buildDir}/javahGenerated")
-}
-
-jar {
- from (tasks.getByPath(":native-io:src:main:native-io-jni:linkRelease").outputs.files.filter { f -> f.isFile()} ) {
- into "/lib"
- }
-}
+/**
+ * Support for bookie entry logs using Direct IO.
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java
new file mode 100644
index 0000000000..2a48157523
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferTest.java
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.directentrylogger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.common.util.nativeio.NativeIOImpl;
+import org.junit.Test;
+
+/**
+ * TestBuffer.
+ */
+public class BufferTest {
+
+ @Test
+ public void testIsAligned() throws Exception {
+ assertFalse(Buffer.isAligned(1234));
+ assertTrue(Buffer.isAligned(4096));
+ assertTrue(Buffer.isAligned(40960));
+ assertTrue(Buffer.isAligned(1 << 20));
+ assertFalse(Buffer.isAligned(-1));
+ assertFalse(Buffer.isAligned(Integer.MAX_VALUE));
+ assertFalse(Buffer.isAligned(Integer.MIN_VALUE));
+ }
+
+ @Test
+ public void testNextAlignment() throws Exception {
+ assertEquals(0, Buffer.nextAlignment(0));
+ assertEquals(4096, Buffer.nextAlignment(1));
+ assertEquals(4096, Buffer.nextAlignment(4096));
+ assertEquals(8192, Buffer.nextAlignment(4097));
+ assertEquals(0x7FFFF000, Buffer.nextAlignment(0x7FFFF000));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNegativePosition() throws Exception {
+ Buffer.nextAlignment(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMaxAlignment() throws Exception {
+ Buffer.nextAlignment(Integer.MAX_VALUE);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCreateUnaligned() throws Exception {
+ new Buffer(new NativeIOImpl(), 1234);
+ }
+
+ @Test
+ public void testWriteInt() throws Exception {
+ int bufferSize = 1 << 20;
+ Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+ assertTrue(b.hasSpace(bufferSize));
+ assertEquals(0, b.position());
+ b.writeInt(0xdeadbeef);
+
+
+ assertEquals((byte) 0xde, PlatformDependent.getByte(b.pointer() + 0));
+ assertEquals((byte) 0xad, PlatformDependent.getByte(b.pointer() + 1));
+ assertEquals((byte) 0xbe, PlatformDependent.getByte(b.pointer() + 2));
+ assertEquals((byte) 0xef, PlatformDependent.getByte(b.pointer() + 3));
+
+ assertFalse(b.hasSpace(bufferSize));
+ assertEquals(Integer.BYTES, b.position());
+
+ for (int i = 0; i < 10000; i++) {
+ b.writeInt(i);
+ }
+ assertEquals(Integer.BYTES * 10001, b.position());
+ assertTrue(b.hasSpace(bufferSize - (Integer.BYTES * 10001)));
+ assertFalse(b.hasSpace(bufferSize - (Integer.BYTES * 10000)));
+
+ assertEquals(0xdeadbeef, b.readInt(0));
+ for (int i = 0; i < 10000; i++) {
+ assertEquals(i, b.readInt((i + 1) * Integer.BYTES));
+ }
+ b.reset();
+ assertTrue(b.hasSpace(bufferSize));
+ assertEquals(0, b.position());
+ }
+
+ @Test
+ public void testWriteBuffer() throws Exception {
+ ByteBuf bb = Unpooled.buffer(1021);
+ fillByteBuf(bb, 0xdeadbeef);
+ int bufferSize = 1 << 20;
+ Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+ assertEquals(0, b.position());
+ b.writeByteBuf(bb);
+ assertEquals(1021, b.position());
+ assertEquals(0, bb.readableBytes());
+ bb.clear();
+ fillByteBuf(bb, 0xcafecafe);
+ b.writeByteBuf(bb);
+ assertEquals(0, bb.readableBytes());
+ assertEquals(2042, b.position());
+
+ bb = Unpooled.buffer(2042);
+ int ret = b.readByteBuf(bb, 0, 2042);
+ assertEquals(2042, ret);
+ for (int i = 0; i < 1020 / Integer.BYTES; i++) {
+ assertEquals(0xdeadbeef, bb.readInt());
+ }
+ assertEquals((byte) 0xde, bb.readByte());
+ for (int i = 0; i < 1020 / Integer.BYTES; i++) {
+ assertEquals(0xcafecafe, bb.readInt());
+ }
+ }
+
+ @Test
+ public void testPartialRead() throws Exception {
+ ByteBuf bb = Unpooled.buffer(5000);
+
+ Buffer b = new Buffer(new NativeIOImpl(), 4096);
+ for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+ b.writeInt(0xdeadbeef);
+ }
+
+ int ret = b.readByteBuf(bb, 0, 5000);
+ assertEquals(4096, ret);
+ }
+
+ @Test(expected = IOException.class)
+ public void testReadIntAtBoundary() throws Exception {
+ Buffer b = new Buffer(new NativeIOImpl(), 4096);
+
+ for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+ b.writeInt(0xdeadbeef);
+ }
+ assertTrue(b.hasData(4092, Integer.BYTES));
+ assertFalse(b.hasData(4093, Integer.BYTES));
+ assertFalse(b.hasData(4096, Integer.BYTES));
+
+ b.readInt(4096 - 2);
+ }
+
+ @Test(expected = IOException.class)
+ public void testReadLongAtBoundary() throws Exception {
+ Buffer b = new Buffer(new NativeIOImpl(), 4096);
+
+ for (int i = 0; i < 4096 / Integer.BYTES; i++) {
+ b.writeInt(0xdeadbeef);
+ }
+ assertTrue(b.hasData(4088, Long.BYTES));
+ assertFalse(b.hasData(4089, Long.BYTES));
+ assertFalse(b.hasData(4096, Long.BYTES));
+
+ b.readInt(4096 - 2);
+ }
+
+ @Test
+ public void testPadToAlignment() throws Exception {
+ Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+
+ for (int i = 0; i < 1025; i++) {
+ b.writeInt(0xdededede);
+ }
+ int writtenLength = b.padToAlignment();
+
+ assertEquals(8192, writtenLength);
+ assertEquals(0xdededede, b.readInt(1024 * Integer.BYTES));
+ for (int i = 1025 * Integer.BYTES; i < writtenLength; i += Integer.BYTES) {
+ assertEquals(0xf0f0f0f0, b.readInt(i));
+ }
+ assertEquals(0, b.readInt(writtenLength));
+ }
+
+ @Test
+ public void testFree() throws Exception {
+ Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+ b.free(); // success if process doesn't explode
+ b.free();
+ }
+
+ static void fillByteBuf(ByteBuf bb, int value) {
+ while (bb.writableBytes() >= Integer.BYTES) {
+ bb.writeInt(value);
+ }
+ for (int i = 0; i < Integer.BYTES && bb.writableBytes() > 0; i++) {
+ byte b = (byte) (value >> (Integer.BYTES - i - 1) * 8);
+ bb.writeByte(b);
+ }
+ }
+}
diff --git a/build.gradle b/build.gradle
index 4fb440c447..f6ff2a4719 100644
--- a/build.gradle
+++ b/build.gradle
@@ -85,6 +85,7 @@ allprojects {
apply from: "$rootDir/dependencies.gradle"
if (it.path != ':circe-checksum:src:main:circe'
&& it.path != ':cpu-affinity:src:main:affinity'
+ && it.path != ':native-io:src:main:native-io-jni'
&& it.name != 'src'
&& it.name != 'main') {
apply plugin: 'java'
diff --git a/native-io/build.gradle b/native-io/build.gradle
index b680940c80..3d16ec8b8d 100644
--- a/native-io/build.gradle
+++ b/native-io/build.gradle
@@ -24,7 +24,6 @@ dependencies {
compileOnly depLibs.lombok
compileOnly depLibs.spotbugsAnnotations
implementation depLibs.commonsLang3
- implementation depLibs.guava
implementation depLibs.slf4j
testImplementation depLibs.junit
diff --git a/native-io/pom.xml b/native-io/pom.xml
index 8ba378dd0b..693bfd9af8 100644
--- a/native-io/pom.xml
+++ b/native-io/pom.xml
@@ -33,10 +33,6 @@
</properties>
<dependencies>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
index d3c83f118f..339dfeda1e 100644
--- a/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
+++ b/native-io/src/main/java/org/apache/bookkeeper/common/util/nativeio/NativeUtils.java
@@ -18,8 +18,6 @@
*/
package org.apache.bookkeeper.common.util.nativeio;
-import static com.google.common.base.Preconditions.checkArgument;
-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
@@ -29,6 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
+import lombok.NonNull;
import lombok.experimental.UtilityClass;
/**
@@ -80,4 +79,9 @@ class NativeUtils {
System.load(temp.getAbsolutePath());
}
-}
+
+ public static void checkArgument(boolean expression, @NonNull Object errorMessage) {
+ if (!expression) {
+ throw new IllegalArgumentException(String.valueOf(errorMessage));
+ }
+ }}
diff --git a/settings.gradle b/settings.gradle
index a9114e1280..7c40cc85ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -60,6 +60,8 @@ include(':bookkeeper-benchmark',
'dev:release',
'microbenchmarks',
'metadata-drivers:etcd',
+ 'native-io',
+ 'native-io:src:main:native-io-jni',
'shaded:distributedlog-core-shaded',
'stats:bookkeeper-stats-api',
'stats:bookkeeper-stats-providers:prometheus-metrics-provider',