You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2019/04/24 13:46:35 UTC

[GitHub] [activemq-artemis] franz1981 commented on a change in pull request #2633: ARTEMIS-2317 Avoid long TTSP caused by Page::read using mmap read

franz1981 commented on a change in pull request #2633: ARTEMIS-2317 Avoid long TTSP caused by Page::read using mmap read
URL: https://github.com/apache/activemq-artemis/pull/2633#discussion_r278133162
 
 

 ##########
 File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 ##########
 @@ -120,105 +110,121 @@ public void setLiveCache(LivePageCache pageCache) {
          throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
       }
 
-      final List<PagedMessage> messages = new ArrayList<>();
-
       size.lazySet((int) file.size());
 
-      if (this.canBeMapped) {
-         readFromMapped(storage, messages);
-         // if the file is open to be written
-         // it needs to updated the position
-         file.position(file.size());
-      } else {
-         readFromSequentialFile(storage, messages);
-      }
+      final List<PagedMessage> messages = readFromSequentialFile(storage);
 
       numberOfMessages.lazySet(messages.size());
 
       return messages;
    }
 
-   private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception {
-      final int fileSize = (int) file.size();
-      //doesn't need to be a direct buffer: that case is covered using the MMAP read
-      final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
-      try {
-         file.position(0);
-         file.read(buffer);
-         buffer.rewind();
-         assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
-         ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer);
-         read(storage, activeMQBuffer, messages);
-      } finally {
-         this.fileFactory.releaseBuffer(buffer);
-      }
+   private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) {
+      final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer);
+      wrappedBuffer.writerIndex(encodedSize);
+      msg.decode(wrappedBuffer);
    }
 
-   private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) {
-      ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
-      return activeMQBuffer;
+   private ByteBuffer allocateAndReadIntoFileBuffer(SequentialFile file,
+                                                    ByteBuffer fileBuffer,
+                                                    int requiredBytes) throws Exception {
+      final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, Env.osPageSize()));
+      newFileBuffer.put(fileBuffer);
+      fileFactory.releaseBuffer(fileBuffer);
+      fileBuffer = newFileBuffer;
+      //move the limit to allow reading as much as possible from the file
+      fileBuffer.limit(fileBuffer.capacity());
+      file.read(fileBuffer);
+      fileBuffer.position(0);
+      return fileBuffer;
    }
 
-   private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
-      try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
-         return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
-      } catch (Exception e) {
-         throw new IllegalStateException(e);
+   private static void compactAndReadIntoFileBuffer(SequentialFile file, ByteBuffer fileBuffer) throws Exception {
+      if (fileBuffer.remaining() > 0) {
+         fileBuffer.compact();
       }
+      fileBuffer.limit(fileBuffer.capacity());
+      file.read(fileBuffer);
+      fileBuffer.position(0);
    }
 
-   private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException {
-      file.position(0);
-      //use a readonly mapped view of the file
-      final int mappedSize = size.get();
-      final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
-      ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, mappedByteBuffer);
-      try {
-         return read(storage, activeMQBuffer, messages);
-      } finally {
-         //unmap the file after read it to avoid GC to take care of it
-         PlatformDependent.freeDirectBuffer(mappedByteBuffer);
+   private ByteBuffer readIntoFileBufferIfNecessary(SequentialFile file,
+                                                    ByteBuffer fileBuffer,
+                                                    int requiredBytes) throws Exception {
+      final int remaining = fileBuffer.remaining();
+      if (remaining < requiredBytes) {
+         final int capacity = fileBuffer.capacity();
+         if (capacity >= requiredBytes) {
+            final int bytesToBeRead = requiredBytes - remaining;
+            final int availableCapacity = capacity - fileBuffer.limit();
+            if (availableCapacity >= bytesToBeRead) {
+               final int originalPosition = fileBuffer.position();
+               fileBuffer.limit(capacity);
+               file.read(fileBuffer);
+               fileBuffer.position(originalPosition);
+            } else {
+               compactAndReadIntoFileBuffer(file, fileBuffer);
+            }
+         } else {
+            fileBuffer = allocateAndReadIntoFileBuffer(file, fileBuffer, requiredBytes);
+         }
       }
+      return fileBuffer;
    }
 
-   private int read(StorageManager storage, ActiveMQBuffer fileBuffer, List<PagedMessage> messages) {
-      int readMessages = 0;
-      while (fileBuffer.readable()) {
-         final int position = fileBuffer.readerIndex();
-
-         byte byteRead = fileBuffer.readByte();
-
-         if (byteRead == Page.START_BYTE) {
-            if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity()) {
-               int messageSize = fileBuffer.readInt();
-               int oldPos = fileBuffer.readerIndex();
-               if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
-                  PagedMessage msg = new PagedMessageImpl(storageManager);
-                  msg.decode(fileBuffer);
-                  byte b = fileBuffer.readByte();
-                  if (b != Page.END_BYTE) {
-                     // Sanity Check: This would only happen if there is a bug on decode or any internal code, as
-                     // this
-                     // constraint was already checked
-                     throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
-                  }
+   private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
+      final List<PagedMessage> messages = new ArrayList<>();
+      final int fileSize = (int) file.size();
+      file.position(0);
+      final int minimumMessagePersistentSize = DataConstants.SIZE_INT + 2;
+      int position = 0;
+      ByteBuffer fileBuffer = null;
+      try {
+         while ((fileSize - position) >= minimumMessagePersistentSize) {
 
 Review comment:
   But I'm checking START_BYTE as before, the logic on the single message hasnìt changed at all.
   In the new logic I'm moving a window of ByteBuffer on the file regions in order to be able to reduce I/O and buffer copies/allocations as much as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services