You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/23 15:00:38 UTC
[activemq-artemis] branch master updated: ARTEMIS-1811 NIO Seq File
should use RandomAccessFile with heap buffers
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new f51c799 ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers
new 5bee113 This closes #2844
f51c799 is described below
commit f51c799ac036f948bb59cd084bdd6f4f5fd51e27
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Mon Sep 3 13:51:23 2018 +0200
ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers
It use RandomAccessFile to allow using heap buffers without additional
copies and/or leaks of direct buffers, as performed by FileChannel JDK
implementation (see https://bugs.openjdk.java.net/browse/JDK-8147468)
---
.../artemis/core/io/nio/NIOSequentialFile.java | 93 ++++++++++++--
.../NIONonBufferedSequentialFileFactoryTest.java | 133 +++++++++++++++++++++
2 files changed, 217 insertions(+), 9 deletions(-)
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 4202a21..e5857ba 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
@@ -42,8 +42,20 @@ import org.apache.activemq.artemis.utils.Env;
public class NIOSequentialFile extends AbstractSequentialFile {
+ /* This value has been tuned just to reduce the memory footprint
+ of read/write of the whole file size: given that this value
+ is > 8192, RandomAccessFile JNI code will use malloc/free instead
+ of using a copy on the stack, but it has been proven to NOT be
+ a bottleneck.
+
+ Instead of reading the whole content in a single operation, this will read in smaller chunks.
+ */
+ private static final int CHUNK_SIZE = 2 * 1024 * 1024;
+
private FileChannel channel;
+ private RandomAccessFile rfile;
+
private final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory,
@@ -82,7 +94,9 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void open(final int maxIO, final boolean useExecutor) throws IOException {
try {
- channel = FileChannel.open(getFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
+ rfile = new RandomAccessFile(getFile(), "rw");
+
+ channel = rfile.getChannel();
fileSize = channel.size();
} catch (ClosedChannelException e) {
@@ -139,18 +153,27 @@ public class NIOSequentialFile extends AbstractSequentialFile {
super.close();
try {
- if (channel != null) {
- if (waitSync && factory.isDatasync())
- channel.force(false);
- channel.close();
+ try {
+ if (channel != null) {
+ if (waitSync && factory.isDatasync())
+ channel.force(false);
+ channel.close();
+ }
+ } finally {
+ if (rfile != null) {
+ rfile.close();
+ }
}
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
+ } finally {
+ channel = null;
+ rfile = null;
}
- channel = null;
+
notifyAll();
}
@@ -160,6 +183,37 @@ public class NIOSequentialFile extends AbstractSequentialFile {
return read(bytes, null);
}
+
+ private static int readRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
+ int remaining = len;
+ int offset = off;
+ while (remaining > 0) {
+ final int chunkSize = Math.min(CHUNK_SIZE, remaining);
+ final int read = raf.read(b, offset, chunkSize);
+ assert read != 0;
+ if (read == -1) {
+ if (len == remaining) {
+ return -1;
+ }
+ break;
+ }
+ offset += read;
+ remaining -= read;
+ }
+ return len - remaining;
+ }
+
+ private static void writeRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
+ int remaining = len;
+ int offset = off;
+ while (remaining > 0) {
+ final int chunkSize = Math.min(CHUNK_SIZE, remaining);
+ raf.write(b, offset, chunkSize);
+ offset += chunkSize;
+ remaining -= chunkSize;
+ }
+ }
+
@Override
public synchronized int read(final ByteBuffer bytes,
final IOCallback callback) throws IOException, ActiveMQIllegalStateException {
@@ -167,7 +221,19 @@ public class NIOSequentialFile extends AbstractSequentialFile {
if (channel == null) {
throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
}
- int bytesRead = channel.read(bytes);
+ final int bytesRead;
+ if (bytes.hasArray()) {
+ if (bytes.remaining() > CHUNK_SIZE) {
+ bytesRead = readRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+ } else {
+ bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+ }
+ if (bytesRead > 0) {
+ bytes.position(bytes.position() + bytesRead);
+ }
+ } else {
+ bytesRead = channel.read(bytes);
+ }
if (callback != null) {
callback.done();
@@ -310,7 +376,16 @@ public class NIOSequentialFile extends AbstractSequentialFile {
final IOCallback callback,
boolean releaseBuffer) throws IOException {
try {
- channel.write(bytes);
+ if (bytes.hasArray()) {
+ if (bytes.remaining() > CHUNK_SIZE) {
+ writeRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+ } else {
+ rfile.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+ }
+ bytes.position(bytes.limit());
+ } else {
+ channel.write(bytes);
+ }
if (sync) {
sync();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
index f793a5c..a6f1e09 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
@@ -17,10 +17,15 @@
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
+import org.junit.Assert;
+import org.junit.Test;
public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
@@ -29,4 +34,132 @@ public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFacto
return new NIOSequentialFileFactory(new File(folder), false, 1);
}
+ @Test
+ public void writeHeapBufferNotFromBeginningAndReadWithDirectBuffer() throws Exception {
+ writeHeapBufferNotFromBeginningAndRead(false);
+ }
+
+ @Test
+ public void writeHeapBufferNotFromBeginningAndReadWithHeapBuffer() throws Exception {
+ writeHeapBufferNotFromBeginningAndRead(true);
+ }
+
+ private void writeHeapBufferNotFromBeginningAndRead(boolean useHeapByteBufferToRead) throws Exception {
+ final SequentialFile file = factory.createSequentialFile("write.amq");
+ file.open();
+ Assert.assertEquals(0, file.size());
+ Assert.assertEquals(0, file.position());
+ try {
+ final String data = "writeDirectArray";
+ final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+ file.position(factory.calculateBlockSize(bytes.length));
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
+ final ByteBuffer readBuffer;
+ if (!useHeapByteBufferToRead) {
+ readBuffer = factory.newBuffer(bytes.length);
+ } else {
+ readBuffer = ByteBuffer.allocate(bytes.length);
+ }
+ try {
+ file.position(factory.calculateBlockSize(bytes.length));
+ Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+ Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+ } finally {
+ if (!useHeapByteBufferToRead) {
+ factory.releaseBuffer(readBuffer);
+ }
+ }
+ } finally {
+ file.close();
+ file.delete();
+ }
+ }
+
+ @Test
+ public void writeHeapBufferAndReadWithDirectBuffer() throws Exception {
+ writeHeapBufferAndRead(false);
+ }
+
+ @Test
+ public void writeHeapBufferAndReadWithHeapBuffer() throws Exception {
+ writeHeapBufferAndRead(true);
+ }
+
+ private void writeHeapBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
+ final SequentialFile file = factory.createSequentialFile("write.amq");
+ file.open();
+ Assert.assertEquals(0, file.size());
+ Assert.assertEquals(0, file.position());
+ try {
+ final String data = "writeDirectArray";
+ final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
+ final ByteBuffer readBuffer;
+ if (!useHeapByteBufferToRead) {
+ readBuffer = factory.newBuffer(bytes.length);
+ } else {
+ readBuffer = ByteBuffer.allocate(bytes.length);
+ }
+ try {
+ file.position(0);
+ Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+ Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+ } finally {
+ if (!useHeapByteBufferToRead) {
+ factory.releaseBuffer(readBuffer);
+ }
+ }
+ } finally {
+ file.close();
+ file.delete();
+ }
+ }
+
+ @Test
+ public void writeHeapAndDirectBufferAndReadWithDirectBuffer() throws Exception {
+ writeHeapAndDirectBufferAndRead(false);
+ }
+
+ @Test
+ public void writeHeapAndDirectBufferAndReadWithHeapBuffer() throws Exception {
+ writeHeapAndDirectBufferAndRead(true);
+ }
+
+ private void writeHeapAndDirectBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
+ final SequentialFile file = factory.createSequentialFile("write.amq");
+ file.open();
+ Assert.assertEquals(0, file.size());
+ Assert.assertEquals(0, file.position());
+ try {
+ final String data = "writeDirectArray";
+ final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
+ final ByteBuffer byteBuffer = factory.newBuffer(bytes.length);
+ byteBuffer.put(bytes);
+ byteBuffer.flip();
+ file.writeDirect(byteBuffer, false);
+ final ByteBuffer readBuffer;
+ if (!useHeapByteBufferToRead) {
+ readBuffer = factory.newBuffer(bytes.length);
+ } else {
+ readBuffer = ByteBuffer.allocate(bytes.length);
+ }
+ try {
+ file.position(0);
+ Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+ Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+ readBuffer.flip();
+ Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+ Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+ } finally {
+ if (!useHeapByteBufferToRead) {
+ factory.releaseBuffer(readBuffer);
+ }
+ }
+ } finally {
+ file.close();
+ file.delete();
+ }
+ }
+
}