You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/17 07:35:19 UTC

[iotdb] branch tiered_storage updated: fix some bugs

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new 4265632b01 fix some bugs
4265632b01 is described below

commit 4265632b01850586edf22dc297c025ff1b44a03c
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 17 15:34:59 2023 +0800

    fix some bugs
---
 .../apache/iotdb/os/cache/CacheFileManager.java    |  4 +-
 .../apache/iotdb/os/cache/CacheRecoverTask.java    |  3 +-
 .../org/apache/iotdb/os/cache/OSFileCache.java     |  3 +-
 .../org/apache/iotdb/os/cache/OSFileCacheKey.java  | 19 ++------
 .../apache/iotdb/os/cache/OSFileCacheValue.java    | 40 +++++++++++++--
 .../{CacheFileChannel.java => OSFileChannel.java}  | 57 ++++++++++++----------
 .../{CacheInputStream.java => OSInputStream.java}  |  6 +--
 .../apache/iotdb/os/fileSystem/OSTsFileInput.java  | 10 ++--
 8 files changed, 86 insertions(+), 56 deletions(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index cb88c0771c..7497f15a99 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -75,7 +75,9 @@ public class CacheFileManager {
       ByteBuffer meta = key.serialize();
       channel.write(meta);
       channel.write(ByteBuffer.wrap(data));
-      res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
+      res =
+          new OSFileCacheValue(
+              tmpCacheFile, 0, meta.capacity(), data.length, key.getStartPosition());
     } catch (IOException e) {
       logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
       tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
index bfebeaadf1..08f505bd89 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable {
               cacheFile.delete();
               continue;
             }
-            OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+            OSFileCacheValue value =
+                new OSFileCacheValue(cacheFile, 0, metaSize, dataSize, key.getStartPosition());
             cache.put(key, value);
           }
           // update max cache file id
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 840b16cd12..91b60791a3 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -84,7 +84,8 @@ public class OSFileCache {
     @Override
     public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws Exception {
       byte[] data =
-          connector.getRemoteFile(key.getFile().toOSURI(), key.getStartPosition(), key.getLength());
+          connector.getRemoteFile(
+              key.getFile().toOSURI(), key.getStartPosition(), config.getCachePageSize());
       return cacheFileManager.persist(key, data);
     }
   }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index 03e3c8d997..c71724a34c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -32,32 +32,27 @@ public class OSFileCacheKey implements Serializable {
   private final OSFile file;
   /** start position in the remote TsFile */
   private final long startPosition;
-  /** data length */
-  private final int length;
 
-  public OSFileCacheKey(OSFile file, long startPosition, int length) {
+  public OSFileCacheKey(OSFile file, long startPosition) {
     this.file = file;
     this.startPosition = startPosition;
-    this.length = length;
   }
 
   public int serializeSize() {
-    return Integer.BYTES + file.toString().getBytes().length + Long.BYTES + Integer.BYTES;
+    return Integer.BYTES + file.toString().getBytes().length + Long.BYTES;
   }
 
   public ByteBuffer serialize() {
     ByteBuffer buffer = ByteBuffer.allocate(serializeSize());
     ReadWriteIOUtils.write(file.toString(), buffer);
     ReadWriteIOUtils.write(startPosition, buffer);
-    ReadWriteIOUtils.write(length, buffer);
     return buffer;
   }
 
   public static OSFileCacheKey deserialize(InputStream inputStream) throws IOException {
     String filePath = ReadWriteIOUtils.readString(inputStream);
     long startPosition = ReadWriteIOUtils.readLong(inputStream);
-    int length = ReadWriteIOUtils.readInt(inputStream);
-    return new OSFileCacheKey(new OSFile(filePath), startPosition, length);
+    return new OSFileCacheKey(new OSFile(filePath), startPosition);
   }
 
   public OSFile getFile() {
@@ -68,13 +63,9 @@ public class OSFileCacheKey implements Serializable {
     return startPosition;
   }
 
-  public int getLength() {
-    return length;
-  }
-
   @Override
   public int hashCode() {
-    return Objects.hash(file, startPosition, length);
+    return Objects.hash(file, startPosition);
   }
 
   @Override
@@ -86,6 +77,6 @@ public class OSFileCacheKey implements Serializable {
       return false;
     }
     OSFileCacheKey that = (OSFileCacheKey) obj;
-    return file.equals(that.file) && startPosition == that.startPosition && length == that.length;
+    return file.equals(that.file) && startPosition == that.startPosition;
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 85295506c5..281fbd228a 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -27,28 +27,39 @@ public class OSFileCacheValue {
   // 如果每个块用一个文件来存储,则该值一直为 0
   // 如果使用一个大文件存储所有块,则该值为大文件中的起点
   /** start position in the local cache file */
-  private long startPosition;
+  private long startPositionInCacheFile;
   /** cache data size */
   private int dataSize;
   /** cache key size */
   private int metaSize;
+  /** start position in the remote TsFile */
+  private long startPositionInTsFile;
+  /** start position in the remote TsFile */
+  private long endPositionInTsFile;
 
   private boolean shouldDelete;
   private int readCnt;
 
-  public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
+  public OSFileCacheValue(
+      File cacheFile,
+      long startPositionInCacheFile,
+      int metaSize,
+      int dataSize,
+      long startPositionInTsFile) {
     this.cacheFile = cacheFile;
-    this.startPosition = startPosition;
+    this.startPositionInCacheFile = startPositionInCacheFile;
     this.metaSize = metaSize;
     this.dataSize = dataSize;
+    this.startPositionInTsFile = startPositionInTsFile;
+    this.endPositionInTsFile = startPositionInTsFile + dataSize;
   }
 
   public File getCacheFile() {
     return cacheFile;
   }
 
-  public long getStartPosition() {
-    return startPosition;
+  public long getStartPositionInCacheFile() {
+    return startPositionInCacheFile;
   }
 
   public int getMetaSize() {
@@ -65,6 +76,25 @@ public class OSFileCacheValue {
     return metaSize + dataSize;
   }
 
+  public long getStartPositionInTsFile() {
+    return startPositionInTsFile;
+  }
+
+  public long getEndPositionInTsFile() {
+    return endPositionInTsFile;
+  }
+
+  /**
+   * Convert position in the TsFile to the corresponding position in the cache file. Return -1 when
+   * the position is outside the cache file range
+   */
+  public long convertTsFilePos2CachePos(long positionInTsFile) {
+    if (positionInTsFile < startPositionInTsFile || positionInTsFile >= endPositionInTsFile) {
+      return -1;
+    }
+    return metaSize + (positionInTsFile - startPositionInTsFile);
+  }
+
   /** Mark this value should be deleted, delete this value when no one is reading it. */
   public synchronized void setShouldDelete() {
     this.shouldDelete = true;
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
similarity index 70%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index f094479037..0380e540f4 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -33,28 +33,28 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 
-public class CacheFileChannel implements Closeable {
+public class OSFileChannel implements Closeable {
   private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
   private static final OSFileCache cache = OSFileCache.getInstance();
   private final OSFile osFile;
   private long position = 0;
-  private OSFileCacheValue currentCacheFile;
+  private OSFileCacheValue cacheFile;
   private FileChannel cacheFileChannel;
-  private long startPositionInTsFile = position;
-  private long endPositionInTsFile = position + config.getCachePageSize();
 
-  public CacheFileChannel(OSFile osFile) {
+  public OSFileChannel(OSFile osFile) throws IOException {
     this.osFile = osFile;
+    openNextCacheFile();
   }
 
-  public static InputStream newInputStream(CacheFileChannel channel) {
-    return new CacheInputStream(channel);
+  public static InputStream newInputStream(OSFileChannel channel) {
+    return new OSInputStream(channel);
   }
 
   private boolean isPositionValid() {
-    return startPositionInTsFile <= position && position < endPositionInTsFile;
+    return cacheFile.getStartPositionInTsFile() <= position
+        && position < cacheFile.getEndPositionInTsFile();
   }
 
   private void openNextCacheFile() throws IOException {
@@ -62,18 +62,18 @@ public class CacheFileChannel implements Closeable {
     close();
     // open next cache file
     OSFileCacheKey key = locateCacheFileFromPosition();
-    while (!currentCacheFile.tryReadLock()) {
-      currentCacheFile = cache.get(key);
+    while (!cacheFile.tryReadLock()) {
+      cacheFile = cache.get(key);
     }
-    cacheFileChannel =
-        FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
-    startPositionInTsFile = currentCacheFile.getStartPosition();
-    endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
+    cacheFileChannel = FileChannel.open(cacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
   }
 
-  private OSFileCacheKey locateCacheFileFromPosition() {
+  private OSFileCacheKey locateCacheFileFromPosition() throws IOException {
+    if (position >= size()) {
+      throw new IOException("EOF");
+    }
     long startPosition = position - position % config.getCachePageSize();
-    return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+    return new OSFileCacheKey(osFile, startPosition);
   }
 
   public long size() {
@@ -96,6 +96,7 @@ public class CacheFileChannel implements Closeable {
   }
 
   public int read(ByteBuffer dst, long position) throws IOException {
+    dst.mark();
     // determiner the ead range
     long startPos = position;
     long endPos = position + dst.remaining();
@@ -111,14 +112,16 @@ public class CacheFileChannel implements Closeable {
       if (!isPositionValid()) {
         openNextCacheFile();
       }
-      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
-      long readEndPosition =
-          currentCacheFile.getMetaSize()
-              + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
+      long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos);
+      long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos);
+      if (readEndPosition < 0) {
+        readEndPosition = cacheFile.getEndPositionInTsFile();
+      }
       int readSize = (int) (readEndPosition - readStartPosition);
-      dst.limit(dst.position() + readSize);
-      int read = cacheFileChannel.read(dst, readStartPosition);
+      cacheFileChannel.position(readStartPosition);
+      long read = cacheFileChannel.read(new ByteBuffer[] {dst}, 0, readSize);
       if (read != readSize) {
+        dst.reset();
         throw new IOException(
             String.format(
                 "Cache file %s may crash because cannot read enough information in the cash file.",
@@ -133,10 +136,12 @@ public class CacheFileChannel implements Closeable {
 
   @Override
   public void close() throws IOException {
-    try {
-      cacheFileChannel.close();
-    } finally {
-      currentCacheFile.readUnlock();
+    if (cacheFile != null) {
+      try {
+        cacheFileChannel.close();
+      } finally {
+        cacheFile.readUnlock();
+      }
     }
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
similarity index 93%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
index 7fcdec4bf8..a972c32a7c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
@@ -22,10 +22,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
-public class CacheInputStream extends InputStream {
-  private final CacheFileChannel channel;
+public class OSInputStream extends InputStream {
+  private final OSFileChannel channel;
 
-  public CacheInputStream(CacheFileChannel channel) {
+  public OSInputStream(OSFileChannel channel) {
     super();
     this.channel = channel;
   }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 07a85e75fe..41d893f929 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.os.fileSystem;
 
-import org.apache.iotdb.os.cache.CacheFileChannel;
+import org.apache.iotdb.os.cache.OSFileChannel;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import java.io.IOException;
@@ -27,11 +27,11 @@ import java.nio.ByteBuffer;
 
 public class OSTsFileInput implements TsFileInput {
   private OSFile file;
-  private CacheFileChannel channel;
+  private OSFileChannel channel;
 
-  public OSTsFileInput(OSFile file) {
+  public OSTsFileInput(OSFile file) throws IOException {
     this.file = file;
-    this.channel = new CacheFileChannel(file);
+    this.channel = new OSFileChannel(file);
   }
 
   @Override
@@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput {
 
   @Override
   public InputStream wrapAsInputStream() throws IOException {
-    return CacheFileChannel.newInputStream(channel);
+    return OSFileChannel.newInputStream(channel);
   }
 
   @Override