You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/17 07:17:17 UTC
[iotdb] 01/01: change some logic in OSFileChannel
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch tiered_storage_0517
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9a08773bf5d080a90fffe20e5c575f21963dded1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 17 15:16:40 2023 +0800
change some logic in OSFileChannel
---
.../apache/iotdb/os/cache/CacheFileManager.java | 4 +-
.../apache/iotdb/os/cache/CacheInputStream.java | 4 +-
.../apache/iotdb/os/cache/CacheRecoverTask.java | 3 +-
.../apache/iotdb/os/cache/OSFileCacheValue.java | 50 +++++++++++++++----
.../{CacheFileChannel.java => OSFileChannel.java} | 56 ++++++++++------------
.../apache/iotdb/os/fileSystem/OSTsFileInput.java | 8 ++--
6 files changed, 77 insertions(+), 48 deletions(-)
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index cb88c0771c..5c74076fce 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -75,7 +75,9 @@ public class CacheFileManager {
ByteBuffer meta = key.serialize();
channel.write(meta);
channel.write(ByteBuffer.wrap(data));
- res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
+ res =
+ new OSFileCacheValue(
+ tmpCacheFile, 0, key.getStartPosition(), meta.capacity(), data.length);
} catch (IOException e) {
logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
tmpCacheFile.delete();
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 7fcdec4bf8..2701a6d6cd 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -23,9 +23,9 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
public class CacheInputStream extends InputStream {
- private final CacheFileChannel channel;
+ private final OSFileChannel channel;
- public CacheInputStream(CacheFileChannel channel) {
+ public CacheInputStream(OSFileChannel channel) {
super();
this.channel = channel;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
index bfebeaadf1..c64a002bd5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -70,7 +70,8 @@ public class CacheRecoverTask implements Runnable {
cacheFile.delete();
continue;
}
- OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+ OSFileCacheValue value =
+ new OSFileCacheValue(cacheFile, 0, key.getStartPosition(), metaSize, dataSize);
cache.put(key, value);
}
// update max cache file id
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 85295506c5..65696cd973 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -19,15 +19,25 @@
package org.apache.iotdb.os.cache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
public class OSFileCacheValue {
+ private static final Logger logger = LoggerFactory.getLogger(OSFileCacheValue.class);
/** local cache file */
private File cacheFile;
// 如果每个块用一个文件来存储,则该值一直为 0
// 如果使用一个大文件存储所有块,则该值为大文件中的起点
/** start position in the local cache file */
- private long startPosition;
+ private long startPositionInCacheFile;
+
+ private long startPositionOfOSFile;
/** cache data size */
private int dataSize;
/** cache key size */
@@ -36,9 +46,13 @@ public class OSFileCacheValue {
private boolean shouldDelete;
private int readCnt;
- public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
+ private FileChannel cacheFileChannel;
+
+ public OSFileCacheValue(
+ File cacheFile, long startPosition, long startPositionOfOSFile, int metaSize, int dataSize) {
this.cacheFile = cacheFile;
- this.startPosition = startPosition;
+ this.startPositionInCacheFile = startPosition;
+ this.startPositionOfOSFile = startPositionOfOSFile;
this.metaSize = metaSize;
this.dataSize = dataSize;
}
@@ -47,8 +61,8 @@ public class OSFileCacheValue {
return cacheFile;
}
- public long getStartPosition() {
- return startPosition;
+ public long getStartPositionInCacheFile() {
+ return startPositionInCacheFile;
}
public int getMetaSize() {
@@ -65,6 +79,15 @@ public class OSFileCacheValue {
return metaSize + dataSize;
}
+ public boolean containsPosition(long position) {
+ return startPositionInCacheFile <= position && position < startPositionInCacheFile + dataSize;
+ }
+
+ public int read(ByteBuffer dst, long startPosition) throws IOException {
+ long startPosInCacheFile = metaSize + (startPosition - this.startPositionOfOSFile);
+ return cacheFileChannel.read(dst, startPosInCacheFile);
+ }
+
/** Mark this value should be deleted, delete this value when no one is reading it. */
public synchronized void setShouldDelete() {
this.shouldDelete = true;
@@ -77,11 +100,14 @@ public class OSFileCacheValue {
* Try to get the read lock, return false when this cache value should be deleted or has been
* deleted.
*/
- public synchronized boolean tryReadLock() {
+ public synchronized boolean tryReadLock() throws IOException {
if (shouldDelete || !cacheFile.exists()) {
return false;
} else {
this.readCnt++;
+ if (!cacheFileChannel.isOpen()) {
+ cacheFileChannel = FileChannel.open(cacheFile.toPath(), StandardOpenOption.READ);
+ }
return true;
}
}
@@ -90,10 +116,18 @@ public class OSFileCacheValue {
* Release the read lock, delete the cache value when no one else is reading it and this cache
* value should be deleted.
*/
- public synchronized void readUnlock() {
+ public synchronized void readUnlock() throws IOException {
this.readCnt--;
+ // delete the cache file only when no reference is used
if (shouldDelete && readCnt == 0) {
- cacheFile.delete();
+ boolean success = cacheFile.delete();
+ if (!success) {
+ logger.error("[OSFileCache] cannot delete cache file {}", cacheFile);
+ }
+ }
+ // close the file channel if no reference is used
+ if (readCnt == 0) {
+ cacheFileChannel.close();
}
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
similarity index 67%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
index f094479037..5bc9bbe4eb 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileChannel.java
@@ -30,45 +30,37 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-public class CacheFileChannel implements Closeable {
+public class OSFileChannel implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
private static final OSFileCache cache = OSFileCache.getInstance();
private final OSFile osFile;
private long position = 0;
- private OSFileCacheValue currentCacheFile;
- private FileChannel cacheFileChannel;
- private long startPositionInTsFile = position;
- private long endPositionInTsFile = position + config.getCachePageSize();
+ private OSFileCacheValue currentOSFileBlock;
- public CacheFileChannel(OSFile osFile) {
+ public OSFileChannel(OSFile osFile) {
this.osFile = osFile;
}
- public static InputStream newInputStream(CacheFileChannel channel) {
+ public static InputStream newInputStream(OSFileChannel channel) {
return new CacheInputStream(channel);
}
- private boolean isPositionValid() {
- return startPositionInTsFile <= position && position < endPositionInTsFile;
+ private boolean canReadFromCurrentCacheBlock(long startPos) {
+ return currentOSFileBlock != null && currentOSFileBlock.containsPosition(startPos);
}
private void openNextCacheFile() throws IOException {
// close prev cache file
- close();
+ releaseCurrentOSFileBlock();
// open next cache file
OSFileCacheKey key = locateCacheFileFromPosition();
- while (!currentCacheFile.tryReadLock()) {
- currentCacheFile = cache.get(key);
+ // 用 while 是为了防止从 cache 中拿出来之后,对应的 value 又被挤出去,导致对应的文件被删除?
+ while (currentOSFileBlock == null || !currentOSFileBlock.tryReadLock()) {
+ currentOSFileBlock = cache.get(key);
}
- cacheFileChannel =
- FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
- startPositionInTsFile = currentCacheFile.getStartPosition();
- endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
}
private OSFileCacheKey locateCacheFileFromPosition() {
@@ -108,17 +100,15 @@ public class CacheFileChannel implements Closeable {
// read each cache file
int totalReadBytes = 0;
while (startPos < endPos) {
- if (!isPositionValid()) {
+ if (!canReadFromCurrentCacheBlock(startPos)) {
openNextCacheFile();
}
- long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
- long readEndPosition =
- currentCacheFile.getMetaSize()
- + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
- int readSize = (int) (readEndPosition - readStartPosition);
- dst.limit(dst.position() + readSize);
- int read = cacheFileChannel.read(dst, readStartPosition);
- if (read != readSize) {
+
+ int maxReadSize = (int) Math.min(endPos - startPos, currentOSFileBlock.getDataSize());
+ dst.limit(dst.position() + maxReadSize);
+
+ int read = currentOSFileBlock.read(dst, startPos);
+ if (read != maxReadSize) {
throw new IOException(
String.format(
"Cache file %s may crash because cannot read enough information in the cash file.",
@@ -131,12 +121,14 @@ public class CacheFileChannel implements Closeable {
return totalReadBytes;
}
+ public void releaseCurrentOSFileBlock() throws IOException {
+ if (currentOSFileBlock != null) {
+ currentOSFileBlock.readUnlock();
+ }
+ }
+
@Override
public void close() throws IOException {
- try {
- cacheFileChannel.close();
- } finally {
- currentCacheFile.readUnlock();
- }
+ releaseCurrentOSFileBlock();
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 07a85e75fe..0dcd600285 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.os.fileSystem;
-import org.apache.iotdb.os.cache.CacheFileChannel;
+import org.apache.iotdb.os.cache.OSFileChannel;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import java.io.IOException;
@@ -27,11 +27,11 @@ import java.nio.ByteBuffer;
public class OSTsFileInput implements TsFileInput {
private OSFile file;
- private CacheFileChannel channel;
+ private OSFileChannel channel;
public OSTsFileInput(OSFile file) {
this.file = file;
- this.channel = new CacheFileChannel(file);
+ this.channel = new OSFileChannel(file);
}
@Override
@@ -62,7 +62,7 @@ public class OSTsFileInput implements TsFileInput {
@Override
public InputStream wrapAsInputStream() throws IOException {
- return CacheFileChannel.newInputStream(channel);
+ return OSFileChannel.newInputStream(channel);
}
@Override