You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/17 07:35:19 UTC
[iotdb] branch tiered_storage updated: fix some bugs
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 4265632b01 fix some bugs
4265632b01 is described below
commit 4265632b01850586edf22dc297c025ff1b44a03c
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 17 15:34:59 2023 +0800
fix some bugs
---
.../apache/iotdb/os/cache/CacheFileManager.java | 4 +-
.../apache/iotdb/os/cache/CacheRecoverTask.java | 3 +-
.../org/apache/iotdb/os/cache/OSFileCache.java | 3 +-
.../org/apache/iotdb/os/cache/OSFileCacheKey.java | 19 ++------
.../apache/iotdb/os/cache/OSFileCacheValue.java | 40 +++++++++++++--
.../{CacheFileChannel.java => OSFileChannel.java} | 57 ++++++++++++----------
.../{CacheInputStream.java => OSInputStream.java} | 6 +--
.../apache/iotdb/os/fileSystem/OSTsFileInput.java | 10 ++--
8 files changed, 86 insertions(+), 56 deletions(-)
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index cb88c0771c..7497f15a99 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -75,7 +75,9 @@ public class CacheFileManager {
ByteBuffer meta = key.serialize();
channel.write(meta);
channel.write(ByteBuffer.wrap(data));
- res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
+ res =
+ new OSFileCacheValue(
+ tmpCacheFile, 0, meta.capacity(), data.length, key.getStartPosition());
} catch (IOException e) {
logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
index bfebeaadf1..08f505bd89 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable {
cacheFile.delete();
continue;
}
- OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+ OSFileCacheValue value =
+ new OSFileCacheValue(cacheFile, 0, metaSize, dataSize, key.getStartPosition());
cache.put(key, value);
}
// update max cache file id
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 840b16cd12..91b60791a3 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -84,7 +84,8 @@ public class OSFileCache {
@Override
public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws Exception {
byte[] data =
- connector.getRemoteFile(key.getFile().toOSURI(), key.getStartPosition(), key.getLength());
+ connector.getRemoteFile(
+ key.getFile().toOSURI(), key.getStartPosition(), config.getCachePageSize());
return cacheFileManager.persist(key, data);
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index 03e3c8d997..c71724a34c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -32,32 +32,27 @@ public class OSFileCacheKey implements Serializable {
private final OSFile file;
/** start position in the remote TsFile */
private final long startPosition;
- /** data length */
- private final int length;
- public OSFileCacheKey(OSFile file, long startPosition, int length) {
+ public OSFileCacheKey(OSFile file, long startPosition) {
this.file = file;
this.startPosition = startPosition;
- this.length = length;
}
public int serializeSize() {
- return Integer.BYTES + file.toString().getBytes().length + Long.BYTES + Integer.BYTES;
+ return Integer.BYTES + file.toString().getBytes().length + Long.BYTES;
}
public ByteBuffer serialize() {
ByteBuffer buffer = ByteBuffer.allocate(serializeSize());
ReadWriteIOUtils.write(file.toString(), buffer);
ReadWriteIOUtils.write(startPosition, buffer);
- ReadWriteIOUtils.write(length, buffer);
return buffer;
}
public static OSFileCacheKey deserialize(InputStream inputStream) throws IOException {
String filePath = ReadWriteIOUtils.readString(inputStream);
long startPosition = ReadWriteIOUtils.readLong(inputStream);
- int length = ReadWriteIOUtils.readInt(inputStream);
- return new OSFileCacheKey(new OSFile(filePath), startPosition, length);
+ return new OSFileCacheKey(new OSFile(filePath), startPosition);
}
public OSFile getFile() {
@@ -68,13 +63,9 @@ public class OSFileCacheKey implements Serializable {
return startPosition;
}
- public int getLength() {
- return length;
- }
-
@Override
public int hashCode() {
- return Objects.hash(file, startPosition, length);
+ return Objects.hash(file, startPosition);
}
@Override
@@ -86,6 +77,6 @@ public class OSFileCacheKey implements Serializable {
return false;
}
OSFileCacheKey that = (OSFileCacheKey) obj;
- return file.equals(that.file) && startPosition == that.startPosition && length == that.length;
+ return file.equals(that.file) && startPosition == that.startPosition;
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 85295506c5..281fbd228a 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -27,28 +27,39 @@ public class OSFileCacheValue {
// 如果每个块用一个文件来存储,则该值一直为 0
// 如果使用一个大文件存储所有块,则该值为大文件中的起点
/** start position in the local cache file */
- private long startPosition;
+ private long startPositionInCacheFile;
/** cache data size */
private int dataSize;
/** cache key size */
private int metaSize;
+ /** start position in the remote TsFile */
+ private long startPositionInTsFile;
+ /** start position in the remote TsFile */
+ private long endPositionInTsFile;
private boolean shouldDelete;
private int readCnt;
- public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
+ public OSFileCacheValue(
+ File cacheFile,
+ long startPositionInCacheFile,
+ int metaSize,
+ int dataSize,
+ long startPositionInTsFile) {
this.cacheFile = cacheFile;
- this.startPosition = startPosition;
+ this.startPositionInCacheFile = startPositionInCacheFile;
this.metaSize = metaSize;
this.dataSize = dataSize;
+ this.startPositionInTsFile = startPositionInTsFile;
+ this.endPositionInTsFile = startPositionInTsFile + dataSize;
}
public File getCacheFile() {
return cacheFile;
}
- public long getStartPosition() {
- return startPosition;
+ public long getStartPositionInCacheFile() {
+ return startPositionInCacheFile;
}
public int getMetaSize() {
@@ -65,6 +76,25 @@ public class OSFileCacheValue {
return metaSize + dataSize;
}
+ public long getStartPositionInTsFile() {
+ return startPositionInTsFile;
+ }
+
+ public long getEndPositionInTsFile() {
+ return endPositionInTsFile;
+ }
+
+ /**
+ * Convert position in the TsFile to the corresponding position in the cache file. Return -1 when
+ * the position is outside the cache file range
+ */
+ public long convertTsFilePos2CachePos(long positionInTsFile) {
+ if (positionInTsFile < startPositionInTsFile || positionInTsFile >= endPositionInTsFile) {
+ return -1;
+ }
+ return metaSize + (positionInTsFile - startPositionInTsFile);
+ }
+
/** Mark this value should be deleted, delete this value when no one is reading it. */
public synchronized void setShouldDelete() {
this.shouldDelete = true;
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
similarity index 70%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index f094479037..0380e540f4 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -33,28 +33,28 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
-public class CacheFileChannel implements Closeable {
+public class OSFileChannel implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
private static final OSFileCache cache = OSFileCache.getInstance();
private final OSFile osFile;
private long position = 0;
- private OSFileCacheValue currentCacheFile;
+ private OSFileCacheValue cacheFile;
private FileChannel cacheFileChannel;
- private long startPositionInTsFile = position;
- private long endPositionInTsFile = position + config.getCachePageSize();
- public CacheFileChannel(OSFile osFile) {
+ public OSFileChannel(OSFile osFile) throws IOException {
this.osFile = osFile;
+ openNextCacheFile();
}
- public static InputStream newInputStream(CacheFileChannel channel) {
- return new CacheInputStream(channel);
+ public static InputStream newInputStream(OSFileChannel channel) {
+ return new OSInputStream(channel);
}
private boolean isPositionValid() {
- return startPositionInTsFile <= position && position < endPositionInTsFile;
+ return cacheFile.getStartPositionInTsFile() <= position
+ && position < cacheFile.getEndPositionInTsFile();
}
private void openNextCacheFile() throws IOException {
@@ -62,18 +62,18 @@ public class CacheFileChannel implements Closeable {
close();
// open next cache file
OSFileCacheKey key = locateCacheFileFromPosition();
- while (!currentCacheFile.tryReadLock()) {
- currentCacheFile = cache.get(key);
+ while (!cacheFile.tryReadLock()) {
+ cacheFile = cache.get(key);
}
- cacheFileChannel =
- FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
- startPositionInTsFile = currentCacheFile.getStartPosition();
- endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
+ cacheFileChannel = FileChannel.open(cacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
}
- private OSFileCacheKey locateCacheFileFromPosition() {
+ private OSFileCacheKey locateCacheFileFromPosition() throws IOException {
+ if (position >= size()) {
+ throw new IOException("EOF");
+ }
long startPosition = position - position % config.getCachePageSize();
- return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+ return new OSFileCacheKey(osFile, startPosition);
}
public long size() {
@@ -96,6 +96,7 @@ public class CacheFileChannel implements Closeable {
}
public int read(ByteBuffer dst, long position) throws IOException {
+ dst.mark();
// determiner the ead range
long startPos = position;
long endPos = position + dst.remaining();
@@ -111,14 +112,16 @@ public class CacheFileChannel implements Closeable {
if (!isPositionValid()) {
openNextCacheFile();
}
- long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
- long readEndPosition =
- currentCacheFile.getMetaSize()
- + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
+ long readStartPosition = cacheFile.convertTsFilePos2CachePos(startPos);
+ long readEndPosition = cacheFile.convertTsFilePos2CachePos(endPos);
+ if (readEndPosition < 0) {
+ readEndPosition = cacheFile.getEndPositionInTsFile();
+ }
int readSize = (int) (readEndPosition - readStartPosition);
- dst.limit(dst.position() + readSize);
- int read = cacheFileChannel.read(dst, readStartPosition);
+ cacheFileChannel.position(readStartPosition);
+ long read = cacheFileChannel.read(new ByteBuffer[] {dst}, 0, readSize);
if (read != readSize) {
+ dst.reset();
throw new IOException(
String.format(
"Cache file %s may crash because cannot read enough information in the cash file.",
@@ -133,10 +136,12 @@ public class CacheFileChannel implements Closeable {
@Override
public void close() throws IOException {
- try {
- cacheFileChannel.close();
- } finally {
- currentCacheFile.readUnlock();
+ if (cacheFile != null) {
+ try {
+ cacheFileChannel.close();
+ } finally {
+ cacheFile.readUnlock();
+ }
}
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
similarity index 93%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
index 7fcdec4bf8..a972c32a7c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSInputStream.java
@@ -22,10 +22,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-public class CacheInputStream extends InputStream {
- private final CacheFileChannel channel;
+public class OSInputStream extends InputStream {
+ private final OSFileChannel channel;
- public CacheInputStream(CacheFileChannel channel) {
+ public OSInputStream(OSFileChannel channel) {
super();
this.channel = channel;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 07a85e75fe..41d893f929 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.os.fileSystem;
-import org.apache.iotdb.os.cache.CacheFileChannel;
+import org.apache.iotdb.os.cache.OSFileChannel;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import java.io.IOException;
@@ -27,11 +27,11 @@ import java.nio.ByteBuffer;
public class OSTsFileInput implements TsFileInput {
private OSFile file;
- private CacheFileChannel channel;
+ private OSFileChannel channel;
- public OSTsFileInput(OSFile file) {
+ public OSTsFileInput(OSFile file) throws IOException {
this.file = file;
- this.channel = new CacheFileChannel(file);
+ this.channel = new OSFileChannel(file);
}
@Override
@@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput {
@Override
public InputStream wrapAsInputStream() throws IOException {
- return CacheFileChannel.newInputStream(channel);
+ return OSFileChannel.newInputStream(channel);
}
@Override