You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/17 07:17:17 UTC

[iotdb] 01/01: change some logic in OSFileChannel

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage_0517
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9a08773bf5d080a90fffe20e5c575f21963dded1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 17 15:16:40 2023 +0800

    change some logic in OSFileChannel
---
 .../apache/iotdb/os/cache/CacheFileManager.java    |  4 +-
 .../apache/iotdb/os/cache/CacheInputStream.java    |  4 +-
 .../apache/iotdb/os/cache/CacheRecoverTask.java    |  3 +-
 .../apache/iotdb/os/cache/OSFileCacheValue.java    | 50 +++++++++++++++----
 .../{CacheFileChannel.java => OSFileChannel.java}  | 56 ++++++++++------------
 .../apache/iotdb/os/fileSystem/OSTsFileInput.java  |  8 ++--
 6 files changed, 77 insertions(+), 48 deletions(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index cb88c0771c..5c74076fce 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -75,7 +75,9 @@ public class CacheFileManager {
       ByteBuffer meta = key.serialize();
       channel.write(meta);
       channel.write(ByteBuffer.wrap(data));
-      res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
+      res =
+          new OSFileCacheValue(
+              tmpCacheFile, 0, key.getStartPosition(), meta.capacity(), data.length);
     } catch (IOException e) {
       logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
       tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 7fcdec4bf8..2701a6d6cd 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -23,9 +23,9 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 public class CacheInputStream extends InputStream {
-  private final CacheFileChannel channel;
+  private final OSFileChannel channel;
 
-  public CacheInputStream(CacheFileChannel channel) {
+  public CacheInputStream(OSFileChannel channel) {
     super();
     this.channel = channel;
   }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
index bfebeaadf1..c64a002bd5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable {
               cacheFile.delete();
               continue;
             }
-            OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+            OSFileCacheValue value =
+                new OSFileCacheValue(cacheFile, 0, key.getStartPosition(), metaSize, dataSize);
             cache.put(key, value);
           }
           // update max cache file id
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 85295506c5..65696cd973 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -19,15 +19,25 @@
 
 package org.apache.iotdb.os.cache;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 
 public class OSFileCacheValue {
+  private static final Logger logger = LoggerFactory.getLogger(OSFileCacheValue.class);
   /** local cache file */
   private File cacheFile;
   // 如果每个块用一个文件来存储,则该值一直为 0
   // 如果使用一个大文件存储所有块,则该值为大文件中的起点
   /** start position in the local cache file */
-  private long startPosition;
+  private long startPositionInCacheFile;
+
+  private long startPositionOfOSFile;
   /** cache data size */
   private int dataSize;
   /** cache key size */
@@ -36,9 +46,13 @@ public class OSFileCacheValue {
   private boolean shouldDelete;
   private int readCnt;
 
-  public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
+  private FileChannel cacheFileChannel;
+
+  public OSFileCacheValue(
+      File cacheFile, long startPosition, long startPositionOfOSFile, int metaSize, int dataSize) {
     this.cacheFile = cacheFile;
-    this.startPosition = startPosition;
+    this.startPositionInCacheFile = startPosition;
+    this.startPositionOfOSFile = startPositionOfOSFile;
     this.metaSize = metaSize;
     this.dataSize = dataSize;
   }
@@ -47,8 +61,8 @@ public class OSFileCacheValue {
     return cacheFile;
   }
 
-  public long getStartPosition() {
-    return startPosition;
+  public long getStartPositionInCacheFile() {
+    return startPositionInCacheFile;
   }
 
   public int getMetaSize() {
@@ -65,6 +79,15 @@ public class OSFileCacheValue {
     return metaSize + dataSize;
   }
 
+  public boolean containsPosition(long position) {
+    return startPositionInCacheFile <= position && position < startPositionInCacheFile + dataSize;
+  }
+
+  public int read(ByteBuffer dst, long startPosition) throws IOException {
+    long startPosInCacheFile = metaSize + (startPosition - this.startPositionOfOSFile);
+    return cacheFileChannel.read(dst, startPosInCacheFile);
+  }
+
   /** Mark this value should be deleted, delete this value when no one is reading it. */
   public synchronized void setShouldDelete() {
     this.shouldDelete = true;
@@ -77,11 +100,14 @@ public class OSFileCacheValue {
    * Try to get the read lock, return false when this cache value should be deleted or has been
    * deleted.
    */
-  public synchronized boolean tryReadLock() {
+  public synchronized boolean tryReadLock() throws IOException {
     if (shouldDelete || !cacheFile.exists()) {
       return false;
     } else {
       this.readCnt++;
+      if (!cacheFileChannel.isOpen()) {
+        cacheFileChannel = FileChannel.open(cacheFile.toPath(), StandardOpenOption.READ);
+      }
       return true;
     }
   }
@@ -90,10 +116,18 @@ public class OSFileCacheValue {
    * Release the read lock, delete the cache value when no one else is reading it and this cache
    * value should be deleted.
    */
-  public synchronized void readUnlock() {
+  public synchronized void readUnlock() throws IOException {
     this.readCnt--;
+    // delete the cache file only when no reference is used
     if (shouldDelete && readCnt == 0) {
-      cacheFile.delete();
+      boolean success = cacheFile.delete();
+      if (!success) {
+        logger.error("[OSFileCache] cannot delete cache file {}", cacheFile);
+      }
+    }
+    // close the file channel if no reference is used
+    if (readCnt == 0) {
+      cacheFileChannel.close();
     }
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
similarity index 67%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index f094479037..5bc9bbe4eb 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -30,45 +30,37 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 
-public class CacheFileChannel implements Closeable {
+public class OSFileChannel implements Closeable {
   private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
   private static final OSFileCache cache = OSFileCache.getInstance();
   private final OSFile osFile;
   private long position = 0;
-  private OSFileCacheValue currentCacheFile;
-  private FileChannel cacheFileChannel;
-  private long startPositionInTsFile = position;
-  private long endPositionInTsFile = position + config.getCachePageSize();
+  private OSFileCacheValue currentOSFileBlock;
 
-  public CacheFileChannel(OSFile osFile) {
+  public OSFileChannel(OSFile osFile) {
     this.osFile = osFile;
   }
 
-  public static InputStream newInputStream(CacheFileChannel channel) {
+  public static InputStream newInputStream(OSFileChannel channel) {
     return new CacheInputStream(channel);
   }
 
-  private boolean isPositionValid() {
-    return startPositionInTsFile <= position && position < endPositionInTsFile;
+  private boolean canReadFromCurrentCacheBlock(long startPos) {
+    return currentOSFileBlock != null && currentOSFileBlock.containsPosition(startPos);
   }
 
   private void openNextCacheFile() throws IOException {
     // close prev cache file
-    close();
+    releaseCurrentOSFileBlock();
     // open next cache file
     OSFileCacheKey key = locateCacheFileFromPosition();
-    while (!currentCacheFile.tryReadLock()) {
-      currentCacheFile = cache.get(key);
+    // 用 while 是为了防止从 cache 中拿出来之后,对应的 value 又被挤出去,导致对应的文件被删除?
+    while (currentOSFileBlock == null || !currentOSFileBlock.tryReadLock()) {
+      currentOSFileBlock = cache.get(key);
     }
-    cacheFileChannel =
-        FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
-    startPositionInTsFile = currentCacheFile.getStartPosition();
-    endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
   }
 
   private OSFileCacheKey locateCacheFileFromPosition() {
@@ -108,17 +100,15 @@ public class CacheFileChannel implements Closeable {
     // read each cache file
     int totalReadBytes = 0;
     while (startPos < endPos) {
-      if (!isPositionValid()) {
+      if (!canReadFromCurrentCacheBlock(startPos)) {
         openNextCacheFile();
       }
-      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
-      long readEndPosition =
-          currentCacheFile.getMetaSize()
-              + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
-      int readSize = (int) (readEndPosition - readStartPosition);
-      dst.limit(dst.position() + readSize);
-      int read = cacheFileChannel.read(dst, readStartPosition);
-      if (read != readSize) {
+
+      int maxReadSize = (int) Math.min(endPos - startPos, currentOSFileBlock.getDataSize());
+      dst.limit(dst.position() + maxReadSize);
+
+      int read = currentOSFileBlock.read(dst, startPos);
+      if (read != maxReadSize) {
         throw new IOException(
             String.format(
                 "Cache file %s may crash because cannot read enough information in the cash file.",
@@ -131,12 +121,14 @@ public class CacheFileChannel implements Closeable {
     return totalReadBytes;
   }
 
+  public void releaseCurrentOSFileBlock() throws IOException {
+    if (currentOSFileBlock != null) {
+      currentOSFileBlock.readUnlock();
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    try {
-      cacheFileChannel.close();
-    } finally {
-      currentCacheFile.readUnlock();
-    }
+    releaseCurrentOSFileBlock();
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 07a85e75fe..0dcd600285 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.os.fileSystem;
 
-import org.apache.iotdb.os.cache.CacheFileChannel;
+import org.apache.iotdb.os.cache.OSFileChannel;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import java.io.IOException;
@@ -27,11 +27,11 @@ import java.nio.ByteBuffer;
 
 public class OSTsFileInput implements TsFileInput {
   private OSFile file;
-  private CacheFileChannel channel;
+  private OSFileChannel channel;
 
   public OSTsFileInput(OSFile file) {
     this.file = file;
-    this.channel = new CacheFileChannel(file);
+    this.channel = new OSFileChannel(file);
   }
 
   @Override
@@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput {
 
   @Override
   public InputStream wrapAsInputStream() throws IOException {
-    return CacheFileChannel.newInputStream(channel);
+    return OSFileChannel.newInputStream(channel);
   }
 
   @Override