You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/16 09:39:15 UTC
[iotdb] branch tiered_storage updated: add cache read
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new fc3e620bbb add cache read
fc3e620bbb is described below
commit fc3e620bbb7076ca9f3fc22485e88fc95964e5f4
Author: HeimingZ <zh...@qq.com>
AuthorDate: Tue May 16 17:38:55 2023 +0800
add cache read
---
.../apache/iotdb/hadoop/fileSystem/HDFSInput.java | 32 ------
.../apache/iotdb/os/cache/CacheFileChannel.java | 123 ++++++++++++++++++++-
.../apache/iotdb/os/cache/CacheFileManager.java | 8 +-
.../apache/iotdb/os/cache/CacheInputStream.java | 72 ++++++++++--
.../org/apache/iotdb/os/cache/IOSFileCache.java | 31 ------
.../org/apache/iotdb/os/cache/OSFileCache.java | 31 ++----
.../apache/iotdb/os/cache/OSFileCacheValue.java | 33 +++++-
.../apache/iotdb/os/fileSystem/OSTsFileInput.java | 63 +++--------
.../iotdb/tsfile/read/reader/LocalTsFileInput.java | 48 --------
.../iotdb/tsfile/read/reader/TsFileInput.java | 39 +++----
10 files changed, 261 insertions(+), 219 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
index 242087bb37..5c0940d495 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.hadoop.fileSystem;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
@@ -30,7 +29,6 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
public class HDFSInput implements TsFileInput {
@@ -100,21 +98,6 @@ public class HDFSInput implements TsFileInput {
return res;
}
- @Override
- public int read() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int read(byte[] b, int off, int len) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FileChannel wrapAsFileChannel() {
- throw new UnsupportedOperationException();
- }
-
@Override
public InputStream wrapAsInputStream() {
return fsDataInputStream;
@@ -125,21 +108,6 @@ public class HDFSInput implements TsFileInput {
fsDataInputStream.close();
}
- @Override
- public int readInt() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public synchronized String readVarIntString(long position) throws IOException {
- long srcPosition = fsDataInputStream.getPos();
-
- fsDataInputStream.seek(position);
- String res = ReadWriteIOUtils.readVarIntString(fsDataInputStream);
- fsDataInputStream.seek(srcPosition);
- return res;
- }
-
@Override
public String getFilePath() {
return path.toString();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
index 8b7bfc9124..24d20b7651 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
@@ -18,4 +18,125 @@
*/
package org.apache.iotdb.os.cache;
-public class CacheFileChannel {}
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.os.fileSystem.OSTsFileInput;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+public class CacheFileChannel implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final OSFileCache cache = OSFileCache.getInstance();
+ private final OSFile osFile;
+ private long position = 0;
+ private OSFileCacheValue currentCacheFile;
+ private FileChannel cacheFileChannel;
+ private long cacheFileStartPosition = position;
+ private long cacheFileEndPosition = position + config.getCachePageSize();
+
+ public CacheFileChannel(OSFile osFile) {
+ this.osFile = osFile;
+ }
+
+ public static InputStream newInputStream(CacheFileChannel channel) {
+ return new CacheInputStream(channel);
+ }
+
+ private OSFileCacheKey getNextCacheFile() {
+ long startPosition = position - position % config.getCachePageSize();
+ return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+ }
+
+ private void openNextCacheFile() throws IOException {
+ // close prev cache file
+ close();
+ // open next cache file
+ OSFileCacheKey key = getNextCacheFile();
+ while (!currentCacheFile.readLock()) {
+ currentCacheFile = cache.get(key);
+ }
+ cacheFileChannel =
+ FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
+ cacheFileStartPosition = currentCacheFile.getStartPosition();
+ cacheFileEndPosition = cacheFileStartPosition + config.getCachePageSize();
+ }
+
+ private boolean isPositionValid(long position) {
+ return cacheFileStartPosition <= position && position <= cacheFileEndPosition;
+ }
+
+ public long size() {
+ return osFile.length();
+ }
+
+ public long position() {
+ return position;
+ }
+
+ public void position(long newPosition) {
+ if (newPosition < 0) {
+ throw new IllegalArgumentException();
+ }
+ position = newPosition;
+ }
+
+ public int read(ByteBuffer dst) throws IOException {
+ return read(dst, position);
+ }
+
+ public int read(ByteBuffer dst, long position) throws IOException {
+ // determiner the ead range
+ long startPos = position;
+ long endPos = position + dst.remaining();
+ if (startPos >= size()) {
+ return -1;
+ }
+ if (endPos > size()) {
+ endPos = size();
+ }
+ // read each cache file
+ int totalReadBytes = 0;
+ while (startPos < endPos) {
+ if (!isPositionValid(startPos)) {
+ openNextCacheFile();
+ }
+ long readStartPosition = currentCacheFile.getMetaSize() + (startPos - cacheFileStartPosition);
+ long readEndPosition =
+ currentCacheFile.getMetaSize()
+ + (Math.min(endPos, cacheFileEndPosition) - cacheFileStartPosition);
+ int readSize = (int) (readEndPosition - readStartPosition);
+ dst.limit(dst.position() + readSize);
+ int read = cacheFileChannel.read(dst, readStartPosition);
+ if (read != readSize) {
+ throw new IOException(
+ String.format(
+ "Cache file %s may crash because cannot read enough information in the cash file.",
+ osFile));
+ }
+ totalReadBytes += read;
+ startPos += read;
+ }
+ this.position = position + totalReadBytes;
+ return totalReadBytes;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ cacheFileChannel.close();
+ } finally {
+ currentCacheFile.readUnlock();
+ }
+ }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index 6dd208ac17..dcde92da09 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -31,15 +31,15 @@ import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
-/** This class manages all io operations to the cache files */
+/** This class manages all write operations to the cache files */
public class CacheFileManager {
private static final Logger logger = LoggerFactory.getLogger(CacheFileManager.class);
private static final String CACHE_FILE_SUFFIX = ".cache";
private static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private String[] cacheDirs;
- private AtomicLong logFileId = new AtomicLong(0);
+ private final String[] cacheDirs;
+ private final AtomicLong logFileId = new AtomicLong(0);
public CacheFileManager(String[] cacheDirs) {
this.cacheDirs = cacheDirs;
@@ -69,7 +69,7 @@ public class CacheFileManager {
ByteBuffer meta = key.serializeToByteBuffer();
channel.write(meta);
channel.write(ByteBuffer.wrap(data));
- res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length);
+ res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length, meta.capacity());
} catch (IOException e) {
logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 2df16266ba..7fcdec4bf8 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -18,26 +18,76 @@
*/
package org.apache.iotdb.os.cache;
-import org.apache.iotdb.os.fileSystem.OSTsFileInput;
-
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
public class CacheInputStream extends InputStream {
- private OSTsFileInput osTsFileInput;
- private long position; // 15
- private long size; // 100
- private InputStream currentInputStream; // 第二个文件,从 5 开始
+ private final CacheFileChannel channel;
- public CacheInputStream(OSTsFileInput osTsFileInput, long position, long size) {
- this.osTsFileInput = osTsFileInput;
- this.position = position;
- this.size = size;
+ public CacheInputStream(CacheFileChannel channel) {
+ super();
+ this.channel = channel;
}
@Override
public int read() throws IOException {
+ byte[] b1 = new byte[1];
+ int n = read(b1);
+ if (n == 1) return b1[0] & 0xff;
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(b);
+ buffer.position(off);
+ buffer.limit(off + len);
+ return channel.read(buffer);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
+ return 0;
+ }
+ if (n > channel.size() - channel.position()) {
+ n = channel.size() - channel.position();
+ }
+ channel.position(channel.position() + n);
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (channel.size() - channel.position());
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void reset() {
+ throw new UnsupportedOperationException();
+ }
- return 0;
+ @Override
+ public boolean markSupported() {
+ throw new UnsupportedOperationException();
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java
deleted file mode 100644
index d780083ff6..0000000000
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/IOSFileCache.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.os.cache;
-
-import org.apache.iotdb.os.fileSystem.OSURI;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-
-public interface IOSFileCache {
- InputStream getAsInputSteam(OSURI file, long startPosition) throws IOException;
-
- FileChannel getLocalCacheFileChannel(OSURI file, long startPosition) throws IOException;
-}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 1525f38283..a7a6f6eea9 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.os.cache;
import org.apache.iotdb.os.conf.ObjectStorageConfig;
import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
-import org.apache.iotdb.os.fileSystem.OSURI;
import org.apache.iotdb.os.io.ObjectStorageConnector;
import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
@@ -34,11 +33,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.FileChannel;
-
-public class OSFileCache implements IOSFileCache {
+public class OSFileCache {
private static final Logger logger = LoggerFactory.getLogger(OSFileCache.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
@@ -58,9 +53,9 @@ public class OSFileCache implements IOSFileCache {
* persistent LRU cache for remote TsFile, value is loaded successfully when it has been stored on
* the disk
*/
- private LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
+ private final LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
/** manage all io operations to the cache files */
- private CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
+ private final CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
private OSFileCache() {
remotePos2LocalCacheFile =
@@ -70,28 +65,16 @@ public class OSFileCache implements IOSFileCache {
(Weigher<OSFileCacheKey, OSFileCacheValue>)
(key, value) -> value.getOccupiedLength())
.removalListener(
- (key, value, cuase) -> {
+ (key, value, cause) -> {
if (value != null) {
- value.clear();
+ value.setShouldDelete();
}
})
.build(new OSFileCacheLoader());
}
- @Override
- public InputStream getAsInputSteam(OSURI file, long startPosition) throws IOException {
- return null;
- }
-
- @Override
- public FileChannel getLocalCacheFileChannel(OSURI file, long startPosition) {
- // 根据 fileName 和 startPosition 计算出对应的本地文件路径,并返回对应的 FileChannel
- // 如果是使用一个 CacheFile, 则寻找到对应的位置,可能需要封装一个自己的 FileChannel 防止读多
- return null;
- }
-
- void put(OSFileCacheKey key, OSFileCacheValue value) {
- remotePos2LocalCacheFile.put(key, value);
+ public OSFileCacheValue get(OSFileCacheKey key) {
+ return remotePos2LocalCacheFile.get(key);
}
class OSFileCacheLoader implements CacheLoader<OSFileCacheKey, OSFileCacheValue> {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index e9afbacacd..0a28d2d5b6 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -29,11 +29,15 @@ public class OSFileCacheValue {
// 如果每个块用一个文件来存储,则该值一直为该文件的大小
// 如果使用一个大文件存储所有块,则该值为该块的实际长度
private int length;
+ private int metaSize;
+ private boolean shouldDelete;
+ private int readCnt;
- public OSFileCacheValue(File cacheFile, long startPosition, int length) {
+ public OSFileCacheValue(File cacheFile, long startPosition, int length, int metaSize) {
this.cacheFile = cacheFile;
this.startPosition = startPosition;
this.length = length;
+ this.metaSize = metaSize;
}
public File getCacheFile() {
@@ -48,13 +52,36 @@ public class OSFileCacheValue {
return length;
}
+ public int getMetaSize() {
+ return metaSize;
+ }
+
public int getOccupiedLength() {
// 如果使用多个文件,则返回该文件的大小
// 如果使用一个文件,则返回每个槽的大小
return length;
}
- public void clear() {
- cacheFile.delete();
+ public synchronized void setShouldDelete() {
+ this.shouldDelete = true;
+ if (readCnt == 0) {
+ cacheFile.delete();
+ }
+ }
+
+ public synchronized boolean readLock() {
+ if (cacheFile.exists()) {
+ this.readCnt++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public synchronized void readUnlock() {
+ this.readCnt--;
+ if (shouldDelete && readCnt == 0) {
+ cacheFile.delete();
+ }
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index c42bc10465..07a85e75fe 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,89 +18,60 @@
*/
package org.apache.iotdb.os.fileSystem;
-import org.apache.iotdb.os.cache.OSFileCache;
+import org.apache.iotdb.os.cache.CacheFileChannel;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
public class OSTsFileInput implements TsFileInput {
- private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
-
private OSFile file;
- private OSFileCache cache = OSFileCache.getInstance();
+ private CacheFileChannel channel;
+
+ public OSTsFileInput(OSFile file) {
+ this.file = file;
+ this.channel = new CacheFileChannel(file);
+ }
@Override
public long size() throws IOException {
- return 0;
+ return channel.size();
}
@Override
public long position() throws IOException {
- return 0;
+ return channel.position();
}
@Override
public TsFileInput position(long newPosition) throws IOException {
- return null;
+ channel.position(newPosition);
+ return this;
}
@Override
public int read(ByteBuffer dst) throws IOException {
-
- return 0;
+ return channel.read(dst);
}
@Override
public int read(ByteBuffer dst, long position) throws IOException {
- return 0;
- }
-
- @Override
- public int read() throws IOException {
- return 0;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- return 0;
- }
-
- @Override
- public FileChannel wrapAsFileChannel() throws IOException {
- throw new UnsupportedEncodingException();
+ return channel.read(dst, position);
}
@Override
public InputStream wrapAsInputStream() throws IOException {
- return null;
- }
-
- @Override
- public void close() throws IOException {}
-
- @Override
- public int readInt() throws IOException {
- return 0;
+ return CacheFileChannel.newInputStream(channel);
}
@Override
- public String readVarIntString(long offset) throws IOException {
- return null;
+ public void close() throws IOException {
+ channel.close();
}
@Override
public String getFilePath() {
- return null;
- }
-
- public InputStream getNextInputStream(long position) throws IOException {
- return cache.getAsInputSteam(file.toOSURI(), position);
+ return file.getPath();
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index cba0c326dd..afb1520d01 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -18,8 +18,6 @@
*/
package org.apache.iotdb.tsfile.read.reader;
-import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,21 +101,6 @@ public class LocalTsFileInput implements TsFileInput {
}
}
- @Override
- public int read() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int read(byte[] b, int off, int len) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public FileChannel wrapAsFileChannel() {
- return channel;
- }
-
@Override
public InputStream wrapAsInputStream() {
return Channels.newInputStream(channel);
@@ -133,37 +116,6 @@ public class LocalTsFileInput implements TsFileInput {
}
}
- @Override
- public int readInt() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String readVarIntString(long offset) throws IOException {
- try {
- ByteBuffer byteBuffer = ByteBuffer.allocate(5);
- channel.read(byteBuffer, offset);
- byteBuffer.flip();
- int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
- if (strLength < 0) {
- return null;
- } else if (strLength == 0) {
- return "";
- }
- ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
- int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
- byte[] bytes = new byte[strLength];
- channel.read(strBuffer, offset + varIntLength);
- strBuffer.flip();
- strBuffer.get(bytes, 0, strLength);
- return new String(bytes, 0, strLength);
- } catch (ClosedByInterruptException e) {
- logger.warn(
- "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel.");
- return null;
- }
- }
-
@Override
public String getFilePath() {
return filePath;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
index 8572f20000..b2e9fc0c47 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
@@ -18,13 +18,14 @@
*/
package org.apache.iotdb.tsfile.read.reader;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
public interface TsFileInput {
@@ -97,20 +98,6 @@ public interface TsFileInput {
*/
int read(ByteBuffer dst, long position) throws IOException;
- /** read a byte from the Input. */
- int read() throws IOException;
-
- /**
- * read an array of byte from the Input.
- *
- * @param b -array of byte
- * @param off -offset of the Input
- * @param len -length
- */
- int read(byte[] b, int off, int len) throws IOException;
-
- FileChannel wrapAsFileChannel() throws IOException;
-
InputStream wrapAsInputStream() throws IOException;
/**
@@ -122,11 +109,25 @@ public interface TsFileInput {
*/
void close() throws IOException;
- /** read 4 bytes from the Input and convert it to a integer. */
- int readInt() throws IOException;
-
/** read a string from the Input at the given position */
- String readVarIntString(long offset) throws IOException;
+ default String readVarIntString(long offset) throws IOException {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(5);
+ read(byteBuffer, offset);
+ byteBuffer.flip();
+ int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
+ if (strLength < 0) {
+ return null;
+ } else if (strLength == 0) {
+ return "";
+ }
+ ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
+ int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
+ byte[] bytes = new byte[strLength];
+ read(strBuffer, offset + varIntLength);
+ strBuffer.flip();
+ strBuffer.get(bytes, 0, strLength);
+ return new String(bytes, 0, strLength);
+ }
String getFilePath();
}