You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/17 03:11:53 UTC
[iotdb] branch tiered_storage updated: add cache recover
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new a1babc8725 add cache recover
a1babc8725 is described below
commit a1babc8725664008c91b606b20da62889e33362e
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 17 11:11:35 2023 +0800
add cache recover
---
.../apache/iotdb/os/cache/CacheFileChannel.java | 28 +++----
.../apache/iotdb/os/cache/CacheFileManager.java | 39 +++++++---
.../apache/iotdb/os/cache/CacheRecoverTask.java | 88 ++++++++++++++++++++++
.../org/apache/iotdb/os/cache/OSFileCache.java | 11 ++-
.../org/apache/iotdb/os/cache/OSFileCacheKey.java | 34 ++++++---
.../apache/iotdb/os/cache/OSFileCacheValue.java | 46 ++++++-----
.../iotdb/os/utils/ObjectStorageConstant.java | 4 +-
7 files changed, 196 insertions(+), 54 deletions(-)
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
index 24d20b7651..f094479037 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
@@ -42,8 +42,8 @@ public class CacheFileChannel implements Closeable {
private long position = 0;
private OSFileCacheValue currentCacheFile;
private FileChannel cacheFileChannel;
- private long cacheFileStartPosition = position;
- private long cacheFileEndPosition = position + config.getCachePageSize();
+ private long startPositionInTsFile = position;
+ private long endPositionInTsFile = position + config.getCachePageSize();
public CacheFileChannel(OSFile osFile) {
this.osFile = osFile;
@@ -53,27 +53,27 @@ public class CacheFileChannel implements Closeable {
return new CacheInputStream(channel);
}
- private OSFileCacheKey getNextCacheFile() {
- long startPosition = position - position % config.getCachePageSize();
- return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+ private boolean isPositionValid() {
+ return startPositionInTsFile <= position && position < endPositionInTsFile;
}
private void openNextCacheFile() throws IOException {
// close prev cache file
close();
// open next cache file
- OSFileCacheKey key = getNextCacheFile();
- while (!currentCacheFile.readLock()) {
+ OSFileCacheKey key = locateCacheFileFromPosition();
+ while (!currentCacheFile.tryReadLock()) {
currentCacheFile = cache.get(key);
}
cacheFileChannel =
FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
- cacheFileStartPosition = currentCacheFile.getStartPosition();
- cacheFileEndPosition = cacheFileStartPosition + config.getCachePageSize();
+ startPositionInTsFile = currentCacheFile.getStartPosition();
+ endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
}
- private boolean isPositionValid(long position) {
- return cacheFileStartPosition <= position && position <= cacheFileEndPosition;
+ private OSFileCacheKey locateCacheFileFromPosition() {
+ long startPosition = position - position % config.getCachePageSize();
+ return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
}
public long size() {
@@ -108,13 +108,13 @@ public class CacheFileChannel implements Closeable {
// read each cache file
int totalReadBytes = 0;
while (startPos < endPos) {
- if (!isPositionValid(startPos)) {
+ if (!isPositionValid()) {
openNextCacheFile();
}
- long readStartPosition = currentCacheFile.getMetaSize() + (startPos - cacheFileStartPosition);
+ long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
long readEndPosition =
currentCacheFile.getMetaSize()
- + (Math.min(endPos, cacheFileEndPosition) - cacheFileStartPosition);
+ + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
int readSize = (int) (readEndPosition - readStartPosition);
dst.limit(dst.position() + readSize);
int read = cacheFileChannel.read(dst, readStartPosition);
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index dcde92da09..cb88c0771c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -31,22 +31,28 @@ import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.CACHE_FILE_SUFFIX;
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.TMP_CACHE_FILE_SUFFIX;
+
/** This class manages all write operations to the cache files */
public class CacheFileManager {
private static final Logger logger = LoggerFactory.getLogger(CacheFileManager.class);
- private static final String CACHE_FILE_SUFFIX = ".cache";
- private static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
private static final ObjectStorageConfig config =
ObjectStorageDescriptor.getInstance().getConfig();
- private final String[] cacheDirs;
- private final AtomicLong logFileId = new AtomicLong(0);
+ private final String[] cacheDirs = config.getCacheDirs();
+ private final AtomicLong cacheFileId = new AtomicLong(0);
- public CacheFileManager(String[] cacheDirs) {
- this.cacheDirs = cacheDirs;
+ private CacheFileManager() {
+ for (String cacheDir : cacheDirs) {
+ File cacheDirFile = new File(cacheDir);
+ if (!cacheDirFile.exists()) {
+ cacheDirFile.mkdirs();
+ }
+ }
}
private long getNextCacheFileId() {
- return logFileId.getAndIncrement();
+ return cacheFileId.getAndIncrement();
}
private File getTmpCacheFile(long id) {
@@ -66,14 +72,29 @@ public class CacheFileManager {
File tmpCacheFile = getTmpCacheFile(cacheFileId);
try (FileChannel channel =
FileChannel.open(tmpCacheFile.toPath(), StandardOpenOption.CREATE_NEW)) {
- ByteBuffer meta = key.serializeToByteBuffer();
+ ByteBuffer meta = key.serialize();
channel.write(meta);
channel.write(ByteBuffer.wrap(data));
- res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length, meta.capacity());
+ res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
} catch (IOException e) {
logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
tmpCacheFile.delete();
}
return tmpCacheFile.renameTo(getCacheFile(cacheFileId)) ? res : null;
}
+
+ /** This method is used by the recover procedure */
+ void setCacheFileId(long startId) {
+ cacheFileId.set(startId);
+ }
+
+ public static CacheFileManager getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final CacheFileManager INSTANCE = new CacheFileManager();
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
new file mode 100644
index 0000000000..bfebeaadf1
--- /dev/null
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os.cache;
+
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.CACHE_FILE_SUFFIX;
+
+public class CacheRecoverTask implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(CacheRecoverTask.class);
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final OSFileCache cache = OSFileCache.getInstance();
+
+ @Override
+ public void run() {
+ long maxCacheFileId = 0;
+ for (String cacheDir : config.getCacheDirs()) {
+ File cacheDirFile = new File(cacheDir);
+ if (!cacheDirFile.exists()) {
+ continue;
+ }
+ File[] cacheFiles = cacheDirFile.listFiles();
+ if (cacheFiles == null) {
+ continue;
+ }
+ for (File cacheFile : cacheFiles) {
+ try {
+ if (cacheFile.isDirectory()) {
+ continue;
+ }
+ String cacheFileName = cacheFile.getName();
+ if (!cacheFileName.endsWith(CACHE_FILE_SUFFIX)) {
+ cacheFile.delete();
+ continue;
+ }
+ // read meta and put it back to the cache
+ try (InputStream in = Files.newInputStream(cacheFile.toPath())) {
+ OSFileCacheKey key = OSFileCacheKey.deserialize(in);
+ int metaSize = key.serializeSize();
+ int dataSize = (int) cacheFile.length() - metaSize;
+ if (dataSize != config.getCachePageSize()) {
+ logger.debug(
+ "Cache file {}'s data size doesn't match the cache page size, so delete it.",
+ cacheFile);
+ cacheFile.delete();
+ continue;
+ }
+ OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+ cache.put(key, value);
+ }
+ // update max cache file id
+ String cacheFileIdStr = cacheFileName.substring(0, cacheFileName.indexOf('.'));
+ long cacheFileId = Long.parseLong(cacheFileIdStr);
+ if (cacheFileId > maxCacheFileId) {
+ maxCacheFileId = cacheFileId;
+ }
+ } catch (Exception e) {
+ logger.error("Failed to recover cache file {}", cacheFile.getName(), e);
+ }
+ }
+ }
+ }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index a7a6f6eea9..840b16cd12 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -55,15 +55,13 @@ public class OSFileCache {
*/
private final LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
/** manage all io operations to the cache files */
- private final CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
+ private final CacheFileManager cacheFileManager = CacheFileManager.getInstance();
private OSFileCache() {
remotePos2LocalCacheFile =
Caffeine.newBuilder()
.maximumWeight(config.getCacheMaxDiskUsage())
- .weigher(
- (Weigher<OSFileCacheKey, OSFileCacheValue>)
- (key, value) -> value.getOccupiedLength())
+ .weigher((Weigher<OSFileCacheKey, OSFileCacheValue>) (key, value) -> value.getLength())
.removalListener(
(key, value, cause) -> {
if (value != null) {
@@ -77,6 +75,11 @@ public class OSFileCache {
return remotePos2LocalCacheFile.get(key);
}
+ /** This method is used by the recover procedure */
+ void put(OSFileCacheKey key, OSFileCacheValue value) {
+ remotePos2LocalCacheFile.put(key, value);
+ }
+
class OSFileCacheLoader implements CacheLoader<OSFileCacheKey, OSFileCacheValue> {
@Override
public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws Exception {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index ff15787263..03e3c8d997 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -19,15 +19,21 @@
package org.apache.iotdb.os.cache;
import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.IOException;
+import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Objects;
public class OSFileCacheKey implements Serializable {
- private OSFile file;
- private long startPosition;
- private int length;
+ /** remote TsFile */
+ private final OSFile file;
+ /** start position in the remote TsFile */
+ private final long startPosition;
+ /** data length */
+ private final int length;
public OSFileCacheKey(OSFile file, long startPosition, int length) {
this.file = file;
@@ -35,15 +41,25 @@ public class OSFileCacheKey implements Serializable {
this.length = length;
}
- public ByteBuffer serializeToByteBuffer() {
- byte[] pathBytes = file.toString().getBytes();
- ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + pathBytes.length + Integer.BYTES);
- buffer.putInt(pathBytes.length);
- buffer.put(pathBytes);
- buffer.putInt(length);
+ public int serializeSize() {
+ return Integer.BYTES + file.toString().getBytes().length + Long.BYTES + Integer.BYTES;
+ }
+
+ public ByteBuffer serialize() {
+ ByteBuffer buffer = ByteBuffer.allocate(serializeSize());
+ ReadWriteIOUtils.write(file.toString(), buffer);
+ ReadWriteIOUtils.write(startPosition, buffer);
+ ReadWriteIOUtils.write(length, buffer);
return buffer;
}
+ public static OSFileCacheKey deserialize(InputStream inputStream) throws IOException {
+ String filePath = ReadWriteIOUtils.readString(inputStream);
+ long startPosition = ReadWriteIOUtils.readLong(inputStream);
+ int length = ReadWriteIOUtils.readInt(inputStream);
+ return new OSFileCacheKey(new OSFile(filePath), startPosition, length);
+ }
+
public OSFile getFile() {
return file;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 0a28d2d5b6..85295506c5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -22,22 +22,25 @@ package org.apache.iotdb.os.cache;
import java.io.File;
public class OSFileCacheValue {
+ /** local cache file */
private File cacheFile;
// 如果每个块用一个文件来存储,则该值一直为 0
// 如果使用一个大文件存储所有块,则该值为大文件中的起点
+ /** start position in the local cache file */
private long startPosition;
- // 如果每个块用一个文件来存储,则该值一直为该文件的大小
- // 如果使用一个大文件存储所有块,则该值为该块的实际长度
- private int length;
+ /** cache data size */
+ private int dataSize;
+ /** cache key size */
private int metaSize;
+
private boolean shouldDelete;
private int readCnt;
- public OSFileCacheValue(File cacheFile, long startPosition, int length, int metaSize) {
+ public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
this.cacheFile = cacheFile;
this.startPosition = startPosition;
- this.length = length;
this.metaSize = metaSize;
+ this.dataSize = dataSize;
}
public File getCacheFile() {
@@ -48,20 +51,21 @@ public class OSFileCacheValue {
return startPosition;
}
- public int getLength() {
- return length;
- }
-
public int getMetaSize() {
return metaSize;
}
- public int getOccupiedLength() {
- // 如果使用多个文件,则返回该文件的大小
- // 如果使用一个文件,则返回每个槽的大小
- return length;
+ public int getDataSize() {
+ return dataSize;
}
+ // 如果每个块用一个文件来存储,则该值一直为该文件的大小
+ // 如果使用一个大文件存储所有块,则该值为该块的实际长度
+ public int getLength() {
+ return metaSize + dataSize;
+ }
+
+ /** Mark this value should be deleted, delete this value when no one is reading it. */
public synchronized void setShouldDelete() {
this.shouldDelete = true;
if (readCnt == 0) {
@@ -69,15 +73,23 @@ public class OSFileCacheValue {
}
}
- public synchronized boolean readLock() {
- if (cacheFile.exists()) {
+ /**
+ * Try to get the read lock, return false when this cache value should be deleted or has been
+ * deleted.
+ */
+ public synchronized boolean tryReadLock() {
+ if (shouldDelete || !cacheFile.exists()) {
+ return false;
+ } else {
this.readCnt++;
return true;
- } else {
- return false;
}
}
+ /**
+ * Release the read lock, delete the cache value when no one else is reading it and this cache
+ * value should be deleted.
+ */
public synchronized void readUnlock() {
this.readCnt--;
if (shouldDelete && readCnt == 0) {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
index 553a1badd8..226b13bfd5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
@@ -19,5 +19,7 @@
package org.apache.iotdb.os.utils;
public class ObjectStorageConstant {
- public static String FILE_SEPARATOR = "/";
+ public static final String FILE_SEPARATOR = "/";
+ public static final String CACHE_FILE_SUFFIX = ".cache";
+ public static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
}