You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/17 09:09:07 UTC

[iotdb] branch tiered_storage updated (e3755ead2f -> 43b3c40aee)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from e3755ead2f fix
     new b7c436101a fix bug when cast postion in cache and tsfile
     new 43b3c40aee tmp save 0517

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/iotdb/os/cache/OSFileCacheValue.java    |  26 +++--
 .../org/apache/iotdb/os/cache/OSFileChannel.java   | 126 +++++++++++++--------
 2 files changed, 95 insertions(+), 57 deletions(-)


[iotdb] 01/02: fix bug when cast postion in cache and tsfile

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b7c436101a9f2fe78a0c2117a9b3983d1d150a27
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 17 15:47:00 2023 +0800

    fix bug when cast postion in cache and tsfile
---
 .../src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java     | 4 ++++
 .../src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java        | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index ff8c985f6f..1eed2f6e72 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -84,6 +84,10 @@ public class OSFileCacheValue {
     return endPositionInTsFile;
   }
 
+  public long getEndPositionInCacheFile() {
+    return startPositionInCacheFile + getLength();
+  }
+
   /**
    * Convert position in the TsFile to the corresponding position in the cache file. Return -1 when
    * the position is outside the cache file range
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index 0380e540f4..b5cee34dcc 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -115,7 +115,7 @@ public class OSFileChannel implements Closeable {
       long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos);
       long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos);
       if (readEndPosition < 0) {
-        readEndPosition = cacheFile.getEndPositionInTsFile();
+        readEndPosition = cacheFile.getEndPositionInCacheFile();
       }
       int readSize = (int) (readEndPosition - readStartPosition);
       cacheFileChannel.position(readStartPosition);


[iotdb] 02/02: tmp save 0517

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43b3c40aeee0eaa935a0a37b85fe64ae6821dad7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 17 17:08:55 2023 +0800

    tmp save 0517
---
 .../apache/iotdb/os/cache/OSFileCacheValue.java    |  22 ++--
 .../org/apache/iotdb/os/cache/OSFileChannel.java   | 126 +++++++++++++--------
 2 files changed, 91 insertions(+), 57 deletions(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 1eed2f6e72..ab9e471890 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -33,9 +33,9 @@ public class OSFileCacheValue {
   /** cache key size */
   private int metaSize;
   /** start position in the remote TsFile */
-  private long startPositionInTsFile;
+  private long startPositionInOSFile;
   /** start position in the remote TsFile */
-  private long endPositionInTsFile;
+  private long endPositionInOSFile;
 
   private boolean shouldDelete;
   private int readCnt;
@@ -45,13 +45,13 @@ public class OSFileCacheValue {
       long startPositionInCacheFile,
       int metaSize,
       int dataSize,
-      long startPositionInTsFile) {
+      long startPositionInOSFile) {
     this.cacheFile = cacheFile;
     this.startPositionInCacheFile = startPositionInCacheFile;
     this.metaSize = metaSize;
     this.dataSize = dataSize;
-    this.startPositionInTsFile = startPositionInTsFile;
-    this.endPositionInTsFile = startPositionInTsFile + dataSize;
+    this.startPositionInOSFile = startPositionInOSFile;
+    this.endPositionInOSFile = startPositionInOSFile + dataSize;
   }
 
   public File getCacheFile() {
@@ -76,12 +76,12 @@ public class OSFileCacheValue {
     return metaSize + dataSize;
   }
 
-  public long getStartPositionInTsFile() {
-    return startPositionInTsFile;
+  public long getStartPositionInOSFile() {
+    return startPositionInOSFile;
   }
 
-  public long getEndPositionInTsFile() {
-    return endPositionInTsFile;
+  public long getEndPositionInOSFile() {
+    return endPositionInOSFile;
   }
 
   public long getEndPositionInCacheFile() {
@@ -93,10 +93,10 @@ public class OSFileCacheValue {
    * the position is outside the cache file range
    */
   public long convertTsFilePos2CachePos(long positionInTsFile) {
-    if (positionInTsFile < startPositionInTsFile || positionInTsFile >= endPositionInTsFile) {
+    if (positionInTsFile < startPositionInOSFile || positionInTsFile >= endPositionInOSFile) {
       return -1;
     }
-    return startPositionInCacheFile + metaSize + (positionInTsFile - startPositionInTsFile);
+    return startPositionInCacheFile + metaSize + (positionInTsFile - startPositionInOSFile);
   }
 
   /** Mark this value should be deleted, delete this value when no one is reading it. */
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index b5cee34dcc..b01e9c6a01 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -40,35 +40,26 @@ public class OSFileChannel implements Closeable {
   private static final OSFileCache cache = OSFileCache.getInstance();
   private final OSFile osFile;
   private long position = 0;
-  private OSFileCacheValue cacheFile;
-  private FileChannel cacheFileChannel;
+
+  private OSFileBlock currentOSFileBlock;
 
   public OSFileChannel(OSFile osFile) throws IOException {
     this.osFile = osFile;
-    openNextCacheFile();
   }
 
   public static InputStream newInputStream(OSFileChannel channel) {
     return new OSInputStream(channel);
   }
 
-  private boolean isPositionValid() {
-    return cacheFile.getStartPositionInTsFile() <= position
-        && position < cacheFile.getEndPositionInTsFile();
-  }
-
-  private void openNextCacheFile() throws IOException {
+  private void openNextCacheFile(int position) throws IOException {
     // close prev cache file
-    close();
+    closeCurrentOSFileBlock();
     // open next cache file
-    OSFileCacheKey key = locateCacheFileFromPosition();
-    while (!cacheFile.tryReadLock()) {
-      cacheFile = cache.get(key);
-    }
-    cacheFileChannel = FileChannel.open(cacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
+    OSFileCacheKey key = locateCacheFileFromPosition(position);
+    currentOSFileBlock = new OSFileBlock(key);
   }
 
-  private OSFileCacheKey locateCacheFileFromPosition() throws IOException {
+  private OSFileCacheKey locateCacheFileFromPosition(int position) throws IOException {
     if (position >= size()) {
       throw new IOException("EOF");
     }
@@ -84,19 +75,26 @@ public class OSFileChannel implements Closeable {
     return position;
   }
 
-  public void position(long newPosition) {
+  public synchronized void position(long newPosition) {
     if (newPosition < 0) {
       throw new IllegalArgumentException();
     }
     position = newPosition;
   }
 
-  public int read(ByteBuffer dst) throws IOException {
-    return read(dst, position);
+  public synchronized int read(ByteBuffer dst) throws IOException {
+    int readSize = read(dst, position);
+    position(position + readSize);
+    return readSize;
   }
 
-  public int read(ByteBuffer dst, long position) throws IOException {
+  public synchronized int read(ByteBuffer dst, long position) throws IOException {
+    int currentPosition = (int) position;
     dst.mark();
+    int dstLimit = dst.limit();
+    // read each cache file
+    int totalReadBytes = 0;
+
     // determiner the ead range
     long startPos = position;
     long endPos = position + dst.remaining();
@@ -106,41 +104,77 @@ public class OSFileChannel implements Closeable {
     if (endPos > size()) {
       endPos = size();
     }
-    // read each cache file
-    int totalReadBytes = 0;
-    while (startPos < endPos) {
-      if (!isPositionValid()) {
-        openNextCacheFile();
-      }
-      long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos);
-      long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos);
-      if (readEndPosition < 0) {
-        readEndPosition = cacheFile.getEndPositionInCacheFile();
-      }
-      int readSize = (int) (readEndPosition - readStartPosition);
-      cacheFileChannel.position(readStartPosition);
-      long read = cacheFileChannel.read(new ByteBuffer[] {dst}, 0, readSize);
-      if (read != readSize) {
-        dst.reset();
-        throw new IOException(
-            String.format(
-                "Cache file %s may crash because cannot read enough information in the cash file.",
-                osFile));
+    try {
+      while (startPos < endPos) {
+        if (currentOSFileBlock == null || !currentOSFileBlock.canRead(startPos)) {
+          openNextCacheFile(currentPosition);
+        }
+        int readSize = currentOSFileBlock.read(dst, startPos, endPos);
+        totalReadBytes += readSize;
+        startPos += readSize;
+        currentPosition += readSize;
       }
-      totalReadBytes += read;
-      startPos += read;
+    } catch (IOException e) {
+      dst.reset();
+      throw e;
+    } finally {
+      dst.limit(dstLimit);
     }
-    this.position = position + totalReadBytes;
     return totalReadBytes;
   }
 
+  private void closeCurrentOSFileBlock() throws IOException {
+    if (currentOSFileBlock != null) {
+      currentOSFileBlock.close();
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    if (cacheFile != null) {
+    closeCurrentOSFileBlock();
+  }
+
+  private static class OSFileBlock {
+    private OSFileCacheValue cacheValue;
+    private FileChannel fileChannel;
+
+    public OSFileBlock(OSFileCacheKey cacheKey) throws IOException {
+      do {
+        cacheValue = cache.get(cacheKey);
+      } while (!cacheValue.tryReadLock());
+      fileChannel = FileChannel.open(cacheValue.getCacheFile().toPath(), StandardOpenOption.READ);
+    }
+
+    public boolean canRead(long positionInOSFile) {
+      return cacheValue.getStartPositionInOSFile() <= positionInOSFile
+          && positionInOSFile < cacheValue.getEndPositionInOSFile();
+    }
+
+    public int read(ByteBuffer dst, long startPos, long endPos) throws IOException {
+      long readStartPosition = cacheValue.convertTsFilePos2CachePos(startPos);
+      long expectedReadLength = endPos - startPos;
+
+      int readSize =
+          (int)
+              Math.min(
+                  expectedReadLength, cacheValue.getEndPositionInCacheFile() - readStartPosition);
+
+      dst.limit(dst.position() + readSize);
+      long actualReadSize = fileChannel.read(dst, readStartPosition);
+      if (actualReadSize != readSize) {
+        throw new IOException(
+            String.format(
+                "Cache file %s may crash because cannot read enough information in the cash file.",
+                cacheValue.getCacheFile()));
+      }
+      return readSize;
+    }
+
+    public void close() throws IOException {
       try {
-        cacheFileChannel.close();
+        fileChannel.close();
       } finally {
-        cacheFile.readUnlock();
+        cacheValue.readUnlock();
       }
     }
   }