You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/16 09:39:15 UTC

[iotdb] branch tiered_storage updated: add cache read

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new fc3e620bbb add cache read
fc3e620bbb is described below

commit fc3e620bbb7076ca9f3fc22485e88fc95964e5f4
Author: HeimingZ <zh...@qq.com>
AuthorDate: Tue May 16 17:38:55 2023 +0800

    add cache read
---
 .../apache/iotdb/hadoop/fileSystem/HDFSInput.java  |  32 ------
 .../apache/iotdb/os/cache/CacheFileChannel.java    | 123 ++++++++++++++++++++-
 .../apache/iotdb/os/cache/CacheFileManager.java    |   8 +-
 .../apache/iotdb/os/cache/CacheInputStream.java    |  72 ++++++++++--
 .../org/apache/iotdb/os/cache/IOSFileCache.java    |  31 ------
 .../org/apache/iotdb/os/cache/OSFileCache.java     |  31 ++----
 .../apache/iotdb/os/cache/OSFileCacheValue.java    |  33 +++++-
 .../apache/iotdb/os/fileSystem/OSTsFileInput.java  |  63 +++--------
 .../iotdb/tsfile/read/reader/LocalTsFileInput.java |  48 --------
 .../iotdb/tsfile/read/reader/TsFileInput.java      |  39 +++----
 10 files changed, 261 insertions(+), 219 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
index 242087bb37..5c0940d495 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.hadoop.fileSystem;
 
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
@@ -30,7 +29,6 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 public class HDFSInput implements TsFileInput {
 
@@ -100,21 +98,6 @@ public class HDFSInput implements TsFileInput {
     return res;
   }
 
-  @Override
-  public int read() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileChannel wrapAsFileChannel() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public InputStream wrapAsInputStream() {
     return fsDataInputStream;
@@ -125,21 +108,6 @@ public class HDFSInput implements TsFileInput {
     fsDataInputStream.close();
   }
 
-  @Override
-  public int readInt() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public synchronized String readVarIntString(long position) throws IOException {
-    long srcPosition = fsDataInputStream.getPos();
-
-    fsDataInputStream.seek(position);
-    String res = ReadWriteIOUtils.readVarIntString(fsDataInputStream);
-    fsDataInputStream.seek(srcPosition);
-    return res;
-  }
-
   @Override
   public String getFilePath() {
     return path.toString();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
index 8b7bfc9124..24d20b7651 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
@@ -18,4 +18,125 @@
  */
 package org.apache.iotdb.os.cache;
 
-public class CacheFileChannel {}
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSTsFileInput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+public class CacheFileChannel implements Closeable {
+  private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
+  private static final ObjectStorageConfig config =
+      ObjectStorageDescriptor.getInstance().getConfig();
+  private static final OSFileCache cache = OSFileCache.getInstance();
+  private final OSFile osFile;
+  private long position = 0;
+  private OSFileCacheValue currentCacheFile;
+  private FileChannel cacheFileChannel;
+  private long cacheFileStartPosition = position;
+  private long cacheFileEndPosition = position + config.getCachePageSize();
+
+  public CacheFileChannel(OSFile osFile) {
+    this.osFile = osFile;
+  }
+
+  public static InputStream newInputStream(CacheFileChannel channel) {
+    return new CacheInputStream(channel);
+  }
+
+  private OSFileCacheKey getNextCacheFile() {
+    long startPosition = position - position % config.getCachePageSize();
+    return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+  }
+
+  private void openNextCacheFile() throws IOException {
+    // close prev cache file
+    close();
+    // open next cache file
+    OSFileCacheKey key = getNextCacheFile();
+    while (!currentCacheFile.readLock()) {
+      currentCacheFile = cache.get(key);
+    }
+    cacheFileChannel =
+        FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
+    cacheFileStartPosition = currentCacheFile.getStartPosition();
+    cacheFileEndPosition = cacheFileStartPosition + config.getCachePageSize();
+  }
+
+  private boolean isPositionValid(long position) {
+    return cacheFileStartPosition <= position && position <= cacheFileEndPosition;
+  }
+
+  public long size() {
+    return osFile.length();
+  }
+
+  public long position() {
+    return position;
+  }
+
+  public void position(long newPosition) {
+    if (newPosition < 0) {
+      throw new IllegalArgumentException();
+    }
+    position = newPosition;
+  }
+
+  public int read(ByteBuffer dst) throws IOException {
+    return read(dst, position);
+  }
+
+  public int read(ByteBuffer dst, long position) throws IOException {
+    // determiner the ead range
+    long startPos = position;
+    long endPos = position + dst.remaining();
+    if (startPos >= size()) {
+      return -1;
+    }
+    if (endPos > size()) {
+      endPos = size();
+    }
+    // read each cache file
+    int totalReadBytes = 0;
+    while (startPos < endPos) {
+      if (!isPositionValid(startPos)) {
+        openNextCacheFile();
+      }
+      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - cacheFileStartPosition);
+      long readEndPosition =
+          currentCacheFile.getMetaSize()
+              + (Math.min(endPos, cacheFileEndPosition) - cacheFileStartPosition);
+      int readSize = (int) (readEndPosition - readStartPosition);
+      dst.limit(dst.position() + readSize);
+      int read = cacheFileChannel.read(dst, readStartPosition);
+      if (read != readSize) {
+        throw new IOException(
+            String.format(
+                "Cache file %s may crash because cannot read enough information in the cash file.",
+                osFile));
+      }
+      totalReadBytes += read;
+      startPos += read;
+    }
+    this.position = position + totalReadBytes;
+    return totalReadBytes;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      cacheFileChannel.close();
+    } finally {
+      currentCacheFile.readUnlock();
+    }
+  }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index 6dd208ac17..dcde92da09 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -31,15 +31,15 @@ import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.concurrent.atomic.AtomicLong;
 
-/** This class manages all io operations to the cache files */
+/** This class manages all write operations to the cache files */
 public class CacheFileManager {
   private static final Logger logger = LoggerFactory.getLogger(CacheFileManager.class);
   private static final String CACHE_FILE_SUFFIX = ".cache";
   private static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
-  private String[] cacheDirs;
-  private AtomicLong logFileId = new AtomicLong(0);
+  private final String[] cacheDirs;
+  private final AtomicLong logFileId = new AtomicLong(0);
 
   public CacheFileManager(String[] cacheDirs) {
     this.cacheDirs = cacheDirs;
@@ -69,7 +69,7 @@ public class CacheFileManager {
       ByteBuffer meta = key.serializeToByteBuffer();
       channel.write(meta);
       channel.write(ByteBuffer.wrap(data));
-      res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length);
+      res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length, meta.capacity());
     } catch (IOException e) {
       logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
       tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 2df16266ba..7fcdec4bf8 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -18,26 +18,76 @@
  */
 package org.apache.iotdb.os.cache;
 
-import org.apache.iotdb.os.fileSystem.OSTsFileInput;
-
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 public class CacheInputStream extends InputStream {
-  private OSTsFileInput osTsFileInput;
-  private long position; // 15
-  private long size; // 100
-  private InputStream currentInputStream; // 第二个文件,从 5 开始
+  private final CacheFileChannel channel;
 
-  public CacheInputStream(OSTsFileInput osTsFileInput, long position, long size) {
-    this.osTsFileInput = osTsFileInput;
-    this.position = position;
-    this.size = size;
+  public CacheInputStream(CacheFileChannel channel) {
+    super();
+    this.channel = channel;
   }
 
   @Override
   public int read() throws IOException {
+    byte[] b1 = new byte[1];
+    int n = read(b1);
+    if (n == 1) return b1[0] & 0xff;
+    return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (len == 0) {
+      return 0;
+    }
+    ByteBuffer buffer = ByteBuffer.wrap(b);
+    buffer.position(off);
+    buffer.limit(off + len);
+    return channel.read(buffer);
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (n <= 0) {
+      return 0;
+    }
+    if (n > channel.size() - channel.position()) {
+      n = channel.size() - channel.position();
+    }
+    channel.position(channel.position() + n);
+    return n;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) (channel.size() - channel.position());
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized void reset() {
+    throw new UnsupportedOperationException();
+  }
 
-    return 0;
+  @Override
+  public boolean markSupported() {
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java
deleted file mode 100644
index d780083ff6..0000000000
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.os.cache;
-
-import org.apache.iotdb.os.fileSystem.OSURI;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-
-public interface IOSFileCache {
-  InputStream getAsInputSteam(OSURI file, long startPosition) throws IOException;
-
-  FileChannel getLocalCacheFileChannel(OSURI file, long startPosition) throws IOException;
-}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 1525f38283..a7a6f6eea9 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.os.cache;
 
 import org.apache.iotdb.os.conf.ObjectStorageConfig;
 import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
-import org.apache.iotdb.os.fileSystem.OSURI;
 import org.apache.iotdb.os.io.ObjectStorageConnector;
 import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
 
@@ -34,11 +33,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-
-public class OSFileCache implements IOSFileCache {
+public class OSFileCache {
   private static final Logger logger = LoggerFactory.getLogger(OSFileCache.class);
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
@@ -58,9 +53,9 @@ public class OSFileCache implements IOSFileCache {
    * persistent LRU cache for remote TsFile, value is loaded successfully when it has been stored on
    * the disk
    */
-  private LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
+  private final LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
   /** manage all io operations to the cache files */
-  private CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
+  private final CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
 
   private OSFileCache() {
     remotePos2LocalCacheFile =
@@ -70,28 +65,16 @@ public class OSFileCache implements IOSFileCache {
                 (Weigher<OSFileCacheKey, OSFileCacheValue>)
                     (key, value) -> value.getOccupiedLength())
             .removalListener(
-                (key, value, cuase) -> {
+                (key, value, cause) -> {
                   if (value != null) {
-                    value.clear();
+                    value.setShouldDelete();
                   }
                 })
             .build(new OSFileCacheLoader());
   }
 
-  @Override
-  public InputStream getAsInputSteam(OSURI file, long startPosition) throws IOException {
-    return null;
-  }
-
-  @Override
-  public FileChannel getLocalCacheFileChannel(OSURI file, long startPosition) {
-    // 根据 fileName 和 startPosition 计算出对应的本地文件路径,并返回对应的 FileChannel
-    // 如果是使用一个 CacheFile, 则寻找到对应的位置,可能需要封装一个自己的 FileChannel 防止读多
-    return null;
-  }
-
-  void put(OSFileCacheKey key, OSFileCacheValue value) {
-    remotePos2LocalCacheFile.put(key, value);
+  public OSFileCacheValue get(OSFileCacheKey key) {
+    return remotePos2LocalCacheFile.get(key);
   }
 
   class OSFileCacheLoader implements CacheLoader<OSFileCacheKey, OSFileCacheValue> {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index e9afbacacd..0a28d2d5b6 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -29,11 +29,15 @@ public class OSFileCacheValue {
   // 如果每个块用一个文件来存储,则该值一直为该文件的大小
   // 如果使用一个大文件存储所有块,则该值为该块的实际长度
   private int length;
+  private int metaSize;
+  private boolean shouldDelete;
+  private int readCnt;
 
-  public OSFileCacheValue(File cacheFile, long startPosition, int length) {
+  public OSFileCacheValue(File cacheFile, long startPosition, int length, int metaSize) {
     this.cacheFile = cacheFile;
     this.startPosition = startPosition;
     this.length = length;
+    this.metaSize = metaSize;
   }
 
   public File getCacheFile() {
@@ -48,13 +52,36 @@ public class OSFileCacheValue {
     return length;
   }
 
+  public int getMetaSize() {
+    return metaSize;
+  }
+
   public int getOccupiedLength() {
     // 如果使用多个文件,则返回该文件的大小
     // 如果使用一个文件,则返回每个槽的大小
     return length;
   }
 
-  public void clear() {
-    cacheFile.delete();
+  public synchronized void setShouldDelete() {
+    this.shouldDelete = true;
+    if (readCnt == 0) {
+      cacheFile.delete();
+    }
+  }
+
+  public synchronized boolean readLock() {
+    if (cacheFile.exists()) {
+      this.readCnt++;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public synchronized void readUnlock() {
+    this.readCnt--;
+    if (shouldDelete && readCnt == 0) {
+      cacheFile.delete();
+    }
   }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index c42bc10465..07a85e75fe 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,89 +18,60 @@
  */
 package org.apache.iotdb.os.fileSystem;
 
-import org.apache.iotdb.os.cache.OSFileCache;
+import org.apache.iotdb.os.cache.CacheFileChannel;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 public class OSTsFileInput implements TsFileInput {
-  private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
-
   private OSFile file;
-  private OSFileCache cache = OSFileCache.getInstance();
+  private CacheFileChannel channel;
+
+  public OSTsFileInput(OSFile file) {
+    this.file = file;
+    this.channel = new CacheFileChannel(file);
+  }
 
   @Override
   public long size() throws IOException {
-    return 0;
+    return channel.size();
   }
 
   @Override
   public long position() throws IOException {
-    return 0;
+    return channel.position();
   }
 
   @Override
   public TsFileInput position(long newPosition) throws IOException {
-    return null;
+    channel.position(newPosition);
+    return this;
   }
 
   @Override
   public int read(ByteBuffer dst) throws IOException {
-
-    return 0;
+    return channel.read(dst);
   }
 
   @Override
   public int read(ByteBuffer dst, long position) throws IOException {
-    return 0;
-  }
-
-  @Override
-  public int read() throws IOException {
-    return 0;
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return 0;
-  }
-
-  @Override
-  public FileChannel wrapAsFileChannel() throws IOException {
-    throw new UnsupportedEncodingException();
+    return channel.read(dst, position);
   }
 
   @Override
   public InputStream wrapAsInputStream() throws IOException {
-    return null;
-  }
-
-  @Override
-  public void close() throws IOException {}
-
-  @Override
-  public int readInt() throws IOException {
-    return 0;
+    return CacheFileChannel.newInputStream(channel);
   }
 
   @Override
-  public String readVarIntString(long offset) throws IOException {
-    return null;
+  public void close() throws IOException {
+    channel.close();
   }
 
   @Override
   public String getFilePath() {
-    return null;
-  }
-
-  public InputStream getNextInputStream(long position) throws IOException {
-    return cache.getAsInputSteam(file.toOSURI(), position);
+    return file.getPath();
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index cba0c326dd..afb1520d01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
-import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,21 +101,6 @@ public class LocalTsFileInput implements TsFileInput {
     }
   }
 
-  @Override
-  public int read() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileChannel wrapAsFileChannel() {
-    return channel;
-  }
-
   @Override
   public InputStream wrapAsInputStream() {
     return Channels.newInputStream(channel);
@@ -133,37 +116,6 @@ public class LocalTsFileInput implements TsFileInput {
     }
   }
 
-  @Override
-  public int readInt() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String readVarIntString(long offset) throws IOException {
-    try {
-      ByteBuffer byteBuffer = ByteBuffer.allocate(5);
-      channel.read(byteBuffer, offset);
-      byteBuffer.flip();
-      int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
-      if (strLength < 0) {
-        return null;
-      } else if (strLength == 0) {
-        return "";
-      }
-      ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
-      int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
-      byte[] bytes = new byte[strLength];
-      channel.read(strBuffer, offset + varIntLength);
-      strBuffer.flip();
-      strBuffer.get(bytes, 0, strLength);
-      return new String(bytes, 0, strLength);
-    } catch (ClosedByInterruptException e) {
-      logger.warn(
-          "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel.");
-      return null;
-    }
-  }
-
   @Override
   public String getFilePath() {
     return filePath;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
index 8572f20000..b2e9fc0c47 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
@@ -18,13 +18,14 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 
 public interface TsFileInput {
@@ -97,20 +98,6 @@ public interface TsFileInput {
    */
   int read(ByteBuffer dst, long position) throws IOException;
 
-  /** read a byte from the Input. */
-  int read() throws IOException;
-
-  /**
-   * read an array of byte from the Input.
-   *
-   * @param b -array of byte
-   * @param off -offset of the Input
-   * @param len -length
-   */
-  int read(byte[] b, int off, int len) throws IOException;
-
-  FileChannel wrapAsFileChannel() throws IOException;
-
   InputStream wrapAsInputStream() throws IOException;
 
   /**
@@ -122,11 +109,25 @@ public interface TsFileInput {
    */
   void close() throws IOException;
 
-  /** read 4 bytes from the Input and convert it to a integer. */
-  int readInt() throws IOException;
-
   /** read a string from the Input at the given position */
-  String readVarIntString(long offset) throws IOException;
+  default String readVarIntString(long offset) throws IOException {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(5);
+    read(byteBuffer, offset);
+    byteBuffer.flip();
+    int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
+    if (strLength < 0) {
+      return null;
+    } else if (strLength == 0) {
+      return "";
+    }
+    ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
+    int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
+    byte[] bytes = new byte[strLength];
+    read(strBuffer, offset + varIntLength);
+    strBuffer.flip();
+    strBuffer.get(bytes, 0, strLength);
+    return new String(bytes, 0, strLength);
+  }
 
   String getFilePath();
 }