You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/07/21 03:17:50 UTC

[incubator-iotdb] 01/01: read file with mmap

This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch mmap_test
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4ff46740926df265af5ed39e63ce4c11d7fda0ec
Author: suyue <23...@qq.com>
AuthorDate: Sun Jul 21 11:17:23 2019 +0800

    read file with mmap
---
 .../tsfile/read/reader/DefaultTsFileInput.java     | 85 ++++++++++++++++++++++
 1 file changed, 85 insertions(+)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
index 53c75f3..b3860a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/DefaultTsFileInput.java
@@ -20,18 +20,29 @@ package org.apache.iotdb.tsfile.read.reader;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 
 public class DefaultTsFileInput implements TsFileInput {
 
   FileChannel channel;
+  MappedByteBuffer mappedByteBuffer;
+  boolean enableMMap = true;
 
   public DefaultTsFileInput(Path file) throws IOException {
     channel = FileChannel.open(file, StandardOpenOption.READ);
+    if (enableMMap && channel.size() < 2147483647L) {
+      mappedByteBuffer = channel.map(MapMode.READ_ONLY, 0, channel.size());
+      mappedByteBuffer.slice();
+    }
   }
 
   @Override
@@ -41,22 +52,42 @@ public class DefaultTsFileInput implements TsFileInput {
 
   @Override
   public long position() throws IOException {
+    if (mappedByteBuffer != null) {
+      return mappedByteBuffer.position();
+    }
     return channel.position();
   }
 
   @Override
   public TsFileInput position(long newPosition) throws IOException {
+    if (mappedByteBuffer != null) {
+      mappedByteBuffer.position((int) newPosition);
+    }
     channel.position(newPosition);
     return this;
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
+    if (mappedByteBuffer != null) {
+      mappedByteBuffer.get(dst.array());
+
+    }
     return channel.read(dst);
   }
 
   @Override
   public int read(ByteBuffer dst, long position) throws IOException {
+
+    if (mappedByteBuffer != null) {
+      int oldPosition = dst.position();
+      for (int i = dst.position(); i < dst.capacity(); i++) {
+        dst.put(i, mappedByteBuffer.get((int) (i + position)));
+      }
+      dst.position(dst.capacity());
+      return dst.capacity() - oldPosition;
+    }
+
     return channel.read(dst, position);
   }
 
@@ -83,10 +114,64 @@ public class DefaultTsFileInput implements TsFileInput {
   @Override
   public void close() throws IOException {
     channel.close();
+    if (mappedByteBuffer != null) {
+      clean(mappedByteBuffer);
+    }
   }
 
   @Override
   public int readInt() throws IOException {
     throw new UnsupportedOperationException();
   }
+
+
+  public static void clean(MappedByteBuffer mappedByteBuffer) {
+    ByteBuffer buffer = mappedByteBuffer;
+    if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) {
+      return;
+    }
+    invoke(invoke(viewed(buffer), "cleaner"), "clean");
+  }
+
+  private static Object invoke(final Object target, final String methodName,
+      final Class<?>... args) {
+    return AccessController.doPrivileged(new PrivilegedAction<Object>() {
+      public Object run() {
+        try {
+          Method method = method(target, methodName, args);
+          method.setAccessible(true);
+          return method.invoke(target);
+        } catch (Exception e) {
+          throw new IllegalStateException(e);
+        }
+      }
+    });
+  }
+
+  private static Method method(Object target, String methodName, Class<?>[] args)
+      throws NoSuchMethodException {
+    try {
+      return target.getClass().getMethod(methodName, args);
+    } catch (NoSuchMethodException e) {
+      return target.getClass().getDeclaredMethod(methodName, args);
+    }
+  }
+
+  private static ByteBuffer viewed(ByteBuffer buffer) {
+    String methodName = "viewedBuffer";
+    Method[] methods = buffer.getClass().getMethods();
+    for (int i = 0; i < methods.length; i++) {
+      if (methods[i].getName().equals("attachment")) {
+        methodName = "attachment";
+        break;
+      }
+    }
+    ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
+    if (viewedBuffer == null) {
+      return buffer;
+    } else {
+      return viewed(viewedBuffer);
+    }
+  }
+
 }