You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/17 07:17:16 UTC

[iotdb] branch tiered_storage_0517 created (now 9a08773bf5)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch tiered_storage_0517
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 9a08773bf5 change some logic in OSFileChannel

This branch includes the following new commits:

     new 9a08773bf5 change some logic in OSFileChannel

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change some logic in OSFileChannel

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage_0517
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9a08773bf5d080a90fffe20e5c575f21963dded1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 17 15:16:40 2023 +0800

    change some logic in OSFileChannel
---
 .../apache/iotdb/os/cache/CacheFileManager.java    |  4 +-
 .../apache/iotdb/os/cache/CacheInputStream.java    |  4 +-
 .../apache/iotdb/os/cache/CacheRecoverTask.java    |  3 +-
 .../apache/iotdb/os/cache/OSFileCacheValue.java    | 50 +++++++++++++++----
 .../{CacheFileChannel.java => OSFileChannel.java}  | 56 ++++++++++------------
 .../apache/iotdb/os/fileSystem/OSTsFileInput.java  |  8 ++--
 6 files changed, 77 insertions(+), 48 deletions(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index cb88c0771c..5c74076fce 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -75,7 +75,9 @@ public class CacheFileManager {
       ByteBuffer meta = key.serialize();
       channel.write(meta);
       channel.write(ByteBuffer.wrap(data));
-      res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
+      res =
+          new OSFileCacheValue(
+              tmpCacheFile, 0, key.getStartPosition(), meta.capacity(), data.length);
     } catch (IOException e) {
       logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
       tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 7fcdec4bf8..2701a6d6cd 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -23,9 +23,9 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 public class CacheInputStream extends InputStream {
-  private final CacheFileChannel channel;
+  private final OSFileChannel channel;
 
-  public CacheInputStream(CacheFileChannel channel) {
+  public CacheInputStream(OSFileChannel channel) {
     super();
     this.channel = channel;
   }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
index bfebeaadf1..c64a002bd5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable {
               cacheFile.delete();
               continue;
             }
-            OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+            OSFileCacheValue value =
+                new OSFileCacheValue(cacheFile, 0, key.getStartPosition(), metaSize, dataSize);
             cache.put(key, value);
           }
           // update max cache file id
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 85295506c5..65696cd973 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -19,15 +19,25 @@
 
 package org.apache.iotdb.os.cache;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 
 public class OSFileCacheValue {
+  private static final Logger logger = LoggerFactory.getLogger(OSFileCacheValue.class);
   /** local cache file */
   private File cacheFile;
   // 如果每个块用一个文件来存储,则该值一直为 0
   // 如果使用一个大文件存储所有块,则该值为大文件中的起点
   /** start position in the local cache file */
-  private long startPosition;
+  private long startPositionInCacheFile;
+
+  private long startPositionOfOSFile;
   /** cache data size */
   private int dataSize;
   /** cache key size */
@@ -36,9 +46,13 @@ public class OSFileCacheValue {
   private boolean shouldDelete;
   private int readCnt;
 
-  public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
+  private FileChannel cacheFileChannel;
+
+  public OSFileCacheValue(
+      File cacheFile, long startPosition, long startPositionOfOSFile, int metaSize, int dataSize) {
     this.cacheFile = cacheFile;
-    this.startPosition = startPosition;
+    this.startPositionInCacheFile = startPosition;
+    this.startPositionOfOSFile = startPositionOfOSFile;
     this.metaSize = metaSize;
     this.dataSize = dataSize;
   }
@@ -47,8 +61,8 @@ public class OSFileCacheValue {
     return cacheFile;
   }
 
-  public long getStartPosition() {
-    return startPosition;
+  public long getStartPositionInCacheFile() {
+    return startPositionInCacheFile;
   }
 
   public int getMetaSize() {
@@ -65,6 +79,15 @@ public class OSFileCacheValue {
     return metaSize + dataSize;
   }
 
+  public boolean containsPosition(long position) {
+    return startPositionInCacheFile <= position && position < startPositionInCacheFile + dataSize;
+  }
+
+  public int read(ByteBuffer dst, long startPosition) throws IOException {
+    long startPosInCacheFile = metaSize + (startPosition - this.startPositionOfOSFile);
+    return cacheFileChannel.read(dst, startPosInCacheFile);
+  }
+
   /** Mark this value should be deleted, delete this value when no one is reading it. */
   public synchronized void setShouldDelete() {
     this.shouldDelete = true;
@@ -77,11 +100,14 @@ public class OSFileCacheValue {
    * Try to get the read lock, return false when this cache value should be deleted or has been
    * deleted.
    */
-  public synchronized boolean tryReadLock() {
+  public synchronized boolean tryReadLock() throws IOException {
     if (shouldDelete || !cacheFile.exists()) {
       return false;
     } else {
       this.readCnt++;
+      if (!cacheFileChannel.isOpen()) {
+        cacheFileChannel = FileChannel.open(cacheFile.toPath(), StandardOpenOption.READ);
+      }
       return true;
     }
   }
@@ -90,10 +116,18 @@ public class OSFileCacheValue {
    * Release the read lock, delete the cache value when no one else is reading it and this cache
    * value should be deleted.
    */
-  public synchronized void readUnlock() {
+  public synchronized void readUnlock() throws IOException {
     this.readCnt--;
+    // delete the cache file only when no reference is used
     if (shouldDelete && readCnt == 0) {
-      cacheFile.delete();
+      boolean success = cacheFile.delete();
+      if (!success) {
+        logger.error("[OSFileCache] cannot delete cache file {}", cacheFile);
+      }
+    }
+    // close the file channel if no reference is used
+    if (readCnt == 0) {
+      cacheFileChannel.close();
     }
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
similarity index 67%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index f094479037..5bc9bbe4eb 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -30,45 +30,37 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 
-public class CacheFileChannel implements Closeable {
+public class OSFileChannel implements Closeable {
   private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
   private static final OSFileCache cache = OSFileCache.getInstance();
   private final OSFile osFile;
   private long position = 0;
-  private OSFileCacheValue currentCacheFile;
-  private FileChannel cacheFileChannel;
-  private long startPositionInTsFile = position;
-  private long endPositionInTsFile = position + config.getCachePageSize();
+  private OSFileCacheValue currentOSFileBlock;
 
-  public CacheFileChannel(OSFile osFile) {
+  public OSFileChannel(OSFile osFile) {
     this.osFile = osFile;
   }
 
-  public static InputStream newInputStream(CacheFileChannel channel) {
+  public static InputStream newInputStream(OSFileChannel channel) {
     return new CacheInputStream(channel);
   }
 
-  private boolean isPositionValid() {
-    return startPositionInTsFile <= position && position < endPositionInTsFile;
+  private boolean canReadFromCurrentCacheBlock(long startPos) {
+    return currentOSFileBlock != null && currentOSFileBlock.containsPosition(startPos);
   }
 
   private void openNextCacheFile() throws IOException {
     // close prev cache file
-    close();
+    releaseCurrentOSFileBlock();
     // open next cache file
     OSFileCacheKey key = locateCacheFileFromPosition();
-    while (!currentCacheFile.tryReadLock()) {
-      currentCacheFile = cache.get(key);
+    // 用 while 是为了防止从 cache 中拿出来之后,对应的 value 又被挤出去,导致对应的文件被删除?
+    while (currentOSFileBlock == null || !currentOSFileBlock.tryReadLock()) {
+      currentOSFileBlock = cache.get(key);
     }
-    cacheFileChannel =
-        FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
-    startPositionInTsFile = currentCacheFile.getStartPosition();
-    endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
   }
 
   private OSFileCacheKey locateCacheFileFromPosition() {
@@ -108,17 +100,15 @@ public class CacheFileChannel implements Closeable {
     // read each cache file
     int totalReadBytes = 0;
     while (startPos < endPos) {
-      if (!isPositionValid()) {
+      if (!canReadFromCurrentCacheBlock(startPos)) {
         openNextCacheFile();
       }
-      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
-      long readEndPosition =
-          currentCacheFile.getMetaSize()
-              + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
-      int readSize = (int) (readEndPosition - readStartPosition);
-      dst.limit(dst.position() + readSize);
-      int read = cacheFileChannel.read(dst, readStartPosition);
-      if (read != readSize) {
+
+      int maxReadSize = (int) Math.min(endPos - startPos, currentOSFileBlock.getDataSize());
+      dst.limit(dst.position() + maxReadSize);
+
+      int read = currentOSFileBlock.read(dst, startPos);
+      if (read != maxReadSize) {
         throw new IOException(
             String.format(
                 "Cache file %s may crash because cannot read enough information in the cash file.",
@@ -131,12 +121,14 @@ public class CacheFileChannel implements Closeable {
     return totalReadBytes;
   }
 
+  public void releaseCurrentOSFileBlock() throws IOException {
+    if (currentOSFileBlock != null) {
+      currentOSFileBlock.readUnlock();
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    try {
-      cacheFileChannel.close();
-    } finally {
-      currentCacheFile.readUnlock();
-    }
+    releaseCurrentOSFileBlock();
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 07a85e75fe..0dcd600285 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.os.fileSystem;
 
-import org.apache.iotdb.os.cache.CacheFileChannel;
+import org.apache.iotdb.os.cache.OSFileChannel;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import java.io.IOException;
@@ -27,11 +27,11 @@ import java.nio.ByteBuffer;
 
 public class OSTsFileInput implements TsFileInput {
   private OSFile file;
-  private CacheFileChannel channel;
+  private OSFileChannel channel;
 
   public OSTsFileInput(OSFile file) {
     this.file = file;
-    this.channel = new CacheFileChannel(file);
+    this.channel = new OSFileChannel(file);
   }
 
   @Override
@@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput {
 
   @Override
   public InputStream wrapAsInputStream() throws IOException {
-    return CacheFileChannel.newInputStream(channel);
+    return OSFileChannel.newInputStream(channel);
   }
 
   @Override