You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/23 15:00:38 UTC

[activemq-artemis] branch master updated: ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new f51c799  ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers
     new 5bee113  This closes #2844
f51c799 is described below

commit f51c799ac036f948bb59cd084bdd6f4f5fd51e27
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Mon Sep 3 13:51:23 2018 +0200

    ARTEMIS-1811 NIO Seq File should use RandomAccessFile with heap buffers
    
    It use RandomAccessFile to allow using heap buffers without additional
    copies and/or leaks of direct buffers, as performed by FileChannel JDK
    implementation (see https://bugs.openjdk.java.net/browse/JDK-8147468)
---
 .../artemis/core/io/nio/NIOSequentialFile.java     |  93 ++++++++++++--
 .../NIONonBufferedSequentialFileFactoryTest.java   | 133 +++++++++++++++++++++
 2 files changed, 217 insertions(+), 9 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 4202a21..e5857ba 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.io.nio;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -42,8 +42,20 @@ import org.apache.activemq.artemis.utils.Env;
 
 public class NIOSequentialFile extends AbstractSequentialFile {
 
+   /* This value has been tuned just to reduce the memory footprint
+      of read/write of the whole file size: given that this value
+      is > 8192, RandomAccessFile JNI code will use malloc/free instead
+      of using a copy on the stack, but it has been proven to NOT be
+      a bottleneck.
+
+      Instead of reading the whole content in a single operation, this will read in smaller chunks.
+    */
+   private static final int CHUNK_SIZE = 2 * 1024 * 1024;
+
    private FileChannel channel;
 
+   private RandomAccessFile rfile;
+
    private final int maxIO;
 
    public NIOSequentialFile(final SequentialFileFactory factory,
@@ -82,7 +94,9 @@ public class NIOSequentialFile extends AbstractSequentialFile {
    @Override
    public void open(final int maxIO, final boolean useExecutor) throws IOException {
       try {
-         channel = FileChannel.open(getFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
+         rfile = new RandomAccessFile(getFile(), "rw");
+
+         channel = rfile.getChannel();
 
          fileSize = channel.size();
       } catch (ClosedChannelException e) {
@@ -139,18 +153,27 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       super.close();
 
       try {
-         if (channel != null) {
-            if (waitSync && factory.isDatasync())
-               channel.force(false);
-            channel.close();
+         try {
+            if (channel != null) {
+               if (waitSync && factory.isDatasync())
+                  channel.force(false);
+               channel.close();
+            }
+         } finally {
+            if (rfile != null) {
+               rfile.close();
+            }
          }
       } catch (ClosedChannelException e) {
          throw e;
       } catch (IOException e) {
          factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
          throw e;
+      } finally {
+         channel = null;
+         rfile = null;
       }
-      channel = null;
+
 
       notifyAll();
    }
@@ -160,6 +183,37 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       return read(bytes, null);
    }
 
+
+   private static int readRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
+      int remaining = len;
+      int offset = off;
+      while (remaining > 0) {
+         final int chunkSize = Math.min(CHUNK_SIZE, remaining);
+         final int read = raf.read(b, offset, chunkSize);
+         assert read != 0;
+         if (read == -1) {
+            if (len == remaining) {
+               return -1;
+            }
+            break;
+         }
+         offset += read;
+         remaining -= read;
+      }
+      return len - remaining;
+   }
+
+   private static void writeRafInChunks(RandomAccessFile raf, byte[] b, int off, int len) throws IOException {
+      int remaining = len;
+      int offset = off;
+      while (remaining > 0) {
+         final int chunkSize = Math.min(CHUNK_SIZE, remaining);
+         raf.write(b, offset, chunkSize);
+         offset += chunkSize;
+         remaining -= chunkSize;
+      }
+   }
+
    @Override
    public synchronized int read(final ByteBuffer bytes,
                                 final IOCallback callback) throws IOException, ActiveMQIllegalStateException {
@@ -167,7 +221,19 @@ public class NIOSequentialFile extends AbstractSequentialFile {
          if (channel == null) {
             throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
          }
-         int bytesRead = channel.read(bytes);
+         final int bytesRead;
+         if (bytes.hasArray()) {
+            if (bytes.remaining() > CHUNK_SIZE) {
+               bytesRead = readRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+            } else {
+               bytesRead = rfile.read(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+            }
+            if (bytesRead > 0) {
+               bytes.position(bytes.position() + bytesRead);
+            }
+         } else {
+            bytesRead = channel.read(bytes);
+         }
 
          if (callback != null) {
             callback.done();
@@ -310,7 +376,16 @@ public class NIOSequentialFile extends AbstractSequentialFile {
                                 final IOCallback callback,
                                 boolean releaseBuffer) throws IOException {
       try {
-         channel.write(bytes);
+         if (bytes.hasArray()) {
+            if (bytes.remaining() > CHUNK_SIZE) {
+               writeRafInChunks(rfile, bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+            } else {
+               rfile.write(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining());
+            }
+            bytes.position(bytes.limit());
+         } else {
+            channel.write(bytes);
+         }
 
          if (sync) {
             sync();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
index f793a5c..a6f1e09 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIONonBufferedSequentialFileFactoryTest.java
@@ -17,10 +17,15 @@
 package org.apache.activemq.artemis.tests.integration.journal;
 
 import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
 
@@ -29,4 +34,132 @@ public class NIONonBufferedSequentialFileFactoryTest extends SequentialFileFacto
       return new NIOSequentialFileFactory(new File(folder), false, 1);
    }
 
+   @Test
+   public void writeHeapBufferNotFromBeginningAndReadWithDirectBuffer() throws Exception {
+      writeHeapBufferNotFromBeginningAndRead(false);
+   }
+
+   @Test
+   public void writeHeapBufferNotFromBeginningAndReadWithHeapBuffer() throws Exception {
+      writeHeapBufferNotFromBeginningAndRead(true);
+   }
+
+   private void writeHeapBufferNotFromBeginningAndRead(boolean useHeapByteBufferToRead) throws Exception {
+      final SequentialFile file = factory.createSequentialFile("write.amq");
+      file.open();
+      Assert.assertEquals(0, file.size());
+      Assert.assertEquals(0, file.position());
+      try {
+         final String data = "writeDirectArray";
+         final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+         file.position(factory.calculateBlockSize(bytes.length));
+         file.writeDirect(ByteBuffer.wrap(bytes), false);
+         final ByteBuffer readBuffer;
+         if (!useHeapByteBufferToRead) {
+            readBuffer = factory.newBuffer(bytes.length);
+         } else {
+            readBuffer = ByteBuffer.allocate(bytes.length);
+         }
+         try {
+            file.position(factory.calculateBlockSize(bytes.length));
+            Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+            Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+         } finally {
+            if (!useHeapByteBufferToRead) {
+               factory.releaseBuffer(readBuffer);
+            }
+         }
+      } finally {
+         file.close();
+         file.delete();
+      }
+   }
+
+   @Test
+   public void writeHeapBufferAndReadWithDirectBuffer() throws Exception {
+      writeHeapBufferAndRead(false);
+   }
+
+   @Test
+   public void writeHeapBufferAndReadWithHeapBuffer() throws Exception {
+      writeHeapBufferAndRead(true);
+   }
+
+   private void writeHeapBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
+      final SequentialFile file = factory.createSequentialFile("write.amq");
+      file.open();
+      Assert.assertEquals(0, file.size());
+      Assert.assertEquals(0, file.position());
+      try {
+         final String data = "writeDirectArray";
+         final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+         file.writeDirect(ByteBuffer.wrap(bytes), false);
+         final ByteBuffer readBuffer;
+         if (!useHeapByteBufferToRead) {
+            readBuffer = factory.newBuffer(bytes.length);
+         } else {
+            readBuffer = ByteBuffer.allocate(bytes.length);
+         }
+         try {
+            file.position(0);
+            Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+            Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+         } finally {
+            if (!useHeapByteBufferToRead) {
+               factory.releaseBuffer(readBuffer);
+            }
+         }
+      } finally {
+         file.close();
+         file.delete();
+      }
+   }
+
+   @Test
+   public void writeHeapAndDirectBufferAndReadWithDirectBuffer() throws Exception {
+      writeHeapAndDirectBufferAndRead(false);
+   }
+
+   @Test
+   public void writeHeapAndDirectBufferAndReadWithHeapBuffer() throws Exception {
+      writeHeapAndDirectBufferAndRead(true);
+   }
+
+   private void writeHeapAndDirectBufferAndRead(boolean useHeapByteBufferToRead) throws Exception {
+      final SequentialFile file = factory.createSequentialFile("write.amq");
+      file.open();
+      Assert.assertEquals(0, file.size());
+      Assert.assertEquals(0, file.position());
+      try {
+         final String data = "writeDirectArray";
+         final byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
+         file.writeDirect(ByteBuffer.wrap(bytes), false);
+         final ByteBuffer byteBuffer = factory.newBuffer(bytes.length);
+         byteBuffer.put(bytes);
+         byteBuffer.flip();
+         file.writeDirect(byteBuffer, false);
+         final ByteBuffer readBuffer;
+         if (!useHeapByteBufferToRead) {
+            readBuffer = factory.newBuffer(bytes.length);
+         } else {
+            readBuffer = ByteBuffer.allocate(bytes.length);
+         }
+         try {
+            file.position(0);
+            Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+            Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+            readBuffer.flip();
+            Assert.assertEquals(factory.calculateBlockSize(bytes.length), file.read(readBuffer));
+            Assert.assertEquals(data, StandardCharsets.UTF_8.decode(readBuffer).toString());
+         } finally {
+            if (!useHeapByteBufferToRead) {
+               factory.releaseBuffer(readBuffer);
+            }
+         }
+      } finally {
+         file.close();
+         file.delete();
+      }
+   }
+
 }