You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/17 03:11:53 UTC

[iotdb] branch tiered_storage updated: add cache recover

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new a1babc8725 add cache recover
a1babc8725 is described below

commit a1babc8725664008c91b606b20da62889e33362e
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 17 11:11:35 2023 +0800

    add cache recover
---
 .../apache/iotdb/os/cache/CacheFileChannel.java    | 28 +++----
 .../apache/iotdb/os/cache/CacheFileManager.java    | 39 +++++++---
 .../apache/iotdb/os/cache/CacheRecoverTask.java    | 88 ++++++++++++++++++++++
 .../org/apache/iotdb/os/cache/OSFileCache.java     | 11 ++-
 .../org/apache/iotdb/os/cache/OSFileCacheKey.java  | 34 ++++++---
 .../apache/iotdb/os/cache/OSFileCacheValue.java    | 46 ++++++-----
 .../iotdb/os/utils/ObjectStorageConstant.java      |  4 +-
 7 files changed, 196 insertions(+), 54 deletions(-)

diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
index 24d20b7651..f094479037 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
@@ -42,8 +42,8 @@ public class CacheFileChannel implements Closeable {
   private long position = 0;
   private OSFileCacheValue currentCacheFile;
   private FileChannel cacheFileChannel;
-  private long cacheFileStartPosition = position;
-  private long cacheFileEndPosition = position + config.getCachePageSize();
+  private long startPositionInTsFile = position;
+  private long endPositionInTsFile = position + config.getCachePageSize();
 
   public CacheFileChannel(OSFile osFile) {
     this.osFile = osFile;
@@ -53,27 +53,27 @@ public class CacheFileChannel implements Closeable {
     return new CacheInputStream(channel);
   }
 
-  private OSFileCacheKey getNextCacheFile() {
-    long startPosition = position - position % config.getCachePageSize();
-    return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
+  private boolean isPositionValid() {
+    return startPositionInTsFile <= position && position < endPositionInTsFile;
   }
 
   private void openNextCacheFile() throws IOException {
     // close prev cache file
     close();
     // open next cache file
-    OSFileCacheKey key = getNextCacheFile();
-    while (!currentCacheFile.readLock()) {
+    OSFileCacheKey key = locateCacheFileFromPosition();
+    while (!currentCacheFile.tryReadLock()) {
       currentCacheFile = cache.get(key);
     }
     cacheFileChannel =
         FileChannel.open(currentCacheFile.getCacheFile().toPath(), StandardOpenOption.READ);
-    cacheFileStartPosition = currentCacheFile.getStartPosition();
-    cacheFileEndPosition = cacheFileStartPosition + config.getCachePageSize();
+    startPositionInTsFile = currentCacheFile.getStartPosition();
+    endPositionInTsFile = startPositionInTsFile + currentCacheFile.getLength();
   }
 
-  private boolean isPositionValid(long position) {
-    return cacheFileStartPosition <= position && position <= cacheFileEndPosition;
+  private OSFileCacheKey locateCacheFileFromPosition() {
+    long startPosition = position - position % config.getCachePageSize();
+    return new OSFileCacheKey(osFile, startPosition, config.getCachePageSize());
   }
 
   public long size() {
@@ -108,13 +108,13 @@ public class CacheFileChannel implements Closeable {
     // read each cache file
     int totalReadBytes = 0;
     while (startPos < endPos) {
-      if (!isPositionValid(startPos)) {
+      if (!isPositionValid()) {
         openNextCacheFile();
       }
-      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - cacheFileStartPosition);
+      long readStartPosition = currentCacheFile.getMetaSize() + (startPos - startPositionInTsFile);
       long readEndPosition =
           currentCacheFile.getMetaSize()
-              + (Math.min(endPos, cacheFileEndPosition) - cacheFileStartPosition);
+              + (Math.min(endPos, endPositionInTsFile) - startPositionInTsFile);
       int readSize = (int) (readEndPosition - readStartPosition);
       dst.limit(dst.position() + readSize);
       int read = cacheFileChannel.read(dst, readStartPosition);
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
index dcde92da09..cb88c0771c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileManager.java
@@ -31,22 +31,28 @@ import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.CACHE_FILE_SUFFIX;
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.TMP_CACHE_FILE_SUFFIX;
+
 /** This class manages all write operations to the cache files */
 public class CacheFileManager {
   private static final Logger logger = LoggerFactory.getLogger(CacheFileManager.class);
-  private static final String CACHE_FILE_SUFFIX = ".cache";
-  private static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
   private static final ObjectStorageConfig config =
       ObjectStorageDescriptor.getInstance().getConfig();
-  private final String[] cacheDirs;
-  private final AtomicLong logFileId = new AtomicLong(0);
+  private final String[] cacheDirs = config.getCacheDirs();
+  private final AtomicLong cacheFileId = new AtomicLong(0);
 
-  public CacheFileManager(String[] cacheDirs) {
-    this.cacheDirs = cacheDirs;
+  private CacheFileManager() {
+    for (String cacheDir : cacheDirs) {
+      File cacheDirFile = new File(cacheDir);
+      if (!cacheDirFile.exists()) {
+        cacheDirFile.mkdirs();
+      }
+    }
   }
 
   private long getNextCacheFileId() {
-    return logFileId.getAndIncrement();
+    return cacheFileId.getAndIncrement();
   }
 
   private File getTmpCacheFile(long id) {
@@ -66,14 +72,29 @@ public class CacheFileManager {
     File tmpCacheFile = getTmpCacheFile(cacheFileId);
     try (FileChannel channel =
         FileChannel.open(tmpCacheFile.toPath(), StandardOpenOption.CREATE_NEW)) {
-      ByteBuffer meta = key.serializeToByteBuffer();
+      ByteBuffer meta = key.serialize();
       channel.write(meta);
       channel.write(ByteBuffer.wrap(data));
-      res = new OSFileCacheValue(tmpCacheFile, meta.capacity(), data.length, meta.capacity());
+      res = new OSFileCacheValue(tmpCacheFile, 0, meta.capacity(), data.length);
     } catch (IOException e) {
       logger.error("Fail to persist data to cache file {}", tmpCacheFile, e);
       tmpCacheFile.delete();
     }
     return tmpCacheFile.renameTo(getCacheFile(cacheFileId)) ? res : null;
   }
+
+  /** This method is used by the recover procedure */
+  void setCacheFileId(long startId) {
+    cacheFileId.set(startId);
+  }
+
+  public static CacheFileManager getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final CacheFileManager INSTANCE = new CacheFileManager();
+  }
 }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
new file mode 100644
index 0000000000..bfebeaadf1
--- /dev/null
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheRecoverTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os.cache;
+
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.CACHE_FILE_SUFFIX;
+
+public class CacheRecoverTask implements Runnable {
+  private static final Logger logger = LoggerFactory.getLogger(CacheRecoverTask.class);
+  private static final ObjectStorageConfig config =
+      ObjectStorageDescriptor.getInstance().getConfig();
+  private static final OSFileCache cache = OSFileCache.getInstance();
+
+  @Override
+  public void run() {
+    long maxCacheFileId = 0;
+    for (String cacheDir : config.getCacheDirs()) {
+      File cacheDirFile = new File(cacheDir);
+      if (!cacheDirFile.exists()) {
+        continue;
+      }
+      File[] cacheFiles = cacheDirFile.listFiles();
+      if (cacheFiles == null) {
+        continue;
+      }
+      for (File cacheFile : cacheFiles) {
+        try {
+          if (cacheFile.isDirectory()) {
+            continue;
+          }
+          String cacheFileName = cacheFile.getName();
+          if (!cacheFileName.endsWith(CACHE_FILE_SUFFIX)) {
+            cacheFile.delete();
+            continue;
+          }
+          // read meta and put it back to the cache
+          try (InputStream in = Files.newInputStream(cacheFile.toPath())) {
+            OSFileCacheKey key = OSFileCacheKey.deserialize(in);
+            int metaSize = key.serializeSize();
+            int dataSize = (int) cacheFile.length() - metaSize;
+            if (dataSize != config.getCachePageSize()) {
+              logger.debug(
+                  "Cache file {}'s data size doesn't match the cache page size, so delete it.",
+                  cacheFile);
+              cacheFile.delete();
+              continue;
+            }
+            OSFileCacheValue value = new OSFileCacheValue(cacheFile, 0, metaSize, dataSize);
+            cache.put(key, value);
+          }
+          // update max cache file id
+          String cacheFileIdStr = cacheFileName.substring(0, cacheFileName.indexOf('.'));
+          long cacheFileId = Long.parseLong(cacheFileIdStr);
+          if (cacheFileId > maxCacheFileId) {
+            maxCacheFileId = cacheFileId;
+          }
+        } catch (Exception e) {
+          logger.error("Failed to recover cache file {}", cacheFile.getName(), e);
+        }
+      }
+    }
+  }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index a7a6f6eea9..840b16cd12 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -55,15 +55,13 @@ public class OSFileCache {
    */
   private final LoadingCache<OSFileCacheKey, OSFileCacheValue> remotePos2LocalCacheFile;
   /** manage all io operations to the cache files */
-  private final CacheFileManager cacheFileManager = new CacheFileManager(config.getCacheDirs());
+  private final CacheFileManager cacheFileManager = CacheFileManager.getInstance();
 
   private OSFileCache() {
     remotePos2LocalCacheFile =
         Caffeine.newBuilder()
             .maximumWeight(config.getCacheMaxDiskUsage())
-            .weigher(
-                (Weigher<OSFileCacheKey, OSFileCacheValue>)
-                    (key, value) -> value.getOccupiedLength())
+            .weigher((Weigher<OSFileCacheKey, OSFileCacheValue>) (key, value) -> value.getLength())
             .removalListener(
                 (key, value, cause) -> {
                   if (value != null) {
@@ -77,6 +75,11 @@ public class OSFileCache {
     return remotePos2LocalCacheFile.get(key);
   }
 
+  /** This method is used by the recover procedure */
+  void put(OSFileCacheKey key, OSFileCacheValue value) {
+    remotePos2LocalCacheFile.put(key, value);
+  }
+
   class OSFileCacheLoader implements CacheLoader<OSFileCacheKey, OSFileCacheValue> {
     @Override
     public @Nullable OSFileCacheValue load(@NonNull OSFileCacheKey key) throws Exception {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index ff15787263..03e3c8d997 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -19,15 +19,21 @@
 package org.apache.iotdb.os.cache;
 
 import org.apache.iotdb.os.fileSystem.OSFile;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class OSFileCacheKey implements Serializable {
-  private OSFile file;
-  private long startPosition;
-  private int length;
+  /** remote TsFile */
+  private final OSFile file;
+  /** start position in the remote TsFile */
+  private final long startPosition;
+  /** data length */
+  private final int length;
 
   public OSFileCacheKey(OSFile file, long startPosition, int length) {
     this.file = file;
@@ -35,15 +41,25 @@ public class OSFileCacheKey implements Serializable {
     this.length = length;
   }
 
-  public ByteBuffer serializeToByteBuffer() {
-    byte[] pathBytes = file.toString().getBytes();
-    ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + pathBytes.length + Integer.BYTES);
-    buffer.putInt(pathBytes.length);
-    buffer.put(pathBytes);
-    buffer.putInt(length);
+  public int serializeSize() {
+    return Integer.BYTES + file.toString().getBytes().length + Long.BYTES + Integer.BYTES;
+  }
+
+  public ByteBuffer serialize() {
+    ByteBuffer buffer = ByteBuffer.allocate(serializeSize());
+    ReadWriteIOUtils.write(file.toString(), buffer);
+    ReadWriteIOUtils.write(startPosition, buffer);
+    ReadWriteIOUtils.write(length, buffer);
     return buffer;
   }
 
+  public static OSFileCacheKey deserialize(InputStream inputStream) throws IOException {
+    String filePath = ReadWriteIOUtils.readString(inputStream);
+    long startPosition = ReadWriteIOUtils.readLong(inputStream);
+    int length = ReadWriteIOUtils.readInt(inputStream);
+    return new OSFileCacheKey(new OSFile(filePath), startPosition, length);
+  }
+
   public OSFile getFile() {
     return file;
   }
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
index 0a28d2d5b6..85295506c5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheValue.java
@@ -22,22 +22,25 @@ package org.apache.iotdb.os.cache;
 import java.io.File;
 
 public class OSFileCacheValue {
+  /** local cache file */
   private File cacheFile;
   // 如果每个块用一个文件来存储,则该值一直为 0
   // 如果使用一个大文件存储所有块,则该值为大文件中的起点
+  /** start position in the local cache file */
   private long startPosition;
-  // 如果每个块用一个文件来存储,则该值一直为该文件的大小
-  // 如果使用一个大文件存储所有块,则该值为该块的实际长度
-  private int length;
+  /** cache data size */
+  private int dataSize;
+  /** cache key size */
   private int metaSize;
+
   private boolean shouldDelete;
   private int readCnt;
 
-  public OSFileCacheValue(File cacheFile, long startPosition, int length, int metaSize) {
+  public OSFileCacheValue(File cacheFile, long startPosition, int metaSize, int dataSize) {
     this.cacheFile = cacheFile;
     this.startPosition = startPosition;
-    this.length = length;
     this.metaSize = metaSize;
+    this.dataSize = dataSize;
   }
 
   public File getCacheFile() {
@@ -48,20 +51,21 @@ public class OSFileCacheValue {
     return startPosition;
   }
 
-  public int getLength() {
-    return length;
-  }
-
   public int getMetaSize() {
     return metaSize;
   }
 
-  public int getOccupiedLength() {
-    // 如果使用多个文件,则返回该文件的大小
-    // 如果使用一个文件,则返回每个槽的大小
-    return length;
+  public int getDataSize() {
+    return dataSize;
   }
 
+  // 如果每个块用一个文件来存储,则该值一直为该文件的大小
+  // 如果使用一个大文件存储所有块,则该值为该块的实际长度
+  public int getLength() {
+    return metaSize + dataSize;
+  }
+
+  /** Mark this value should be deleted, delete this value when no one is reading it. */
   public synchronized void setShouldDelete() {
     this.shouldDelete = true;
     if (readCnt == 0) {
@@ -69,15 +73,23 @@ public class OSFileCacheValue {
     }
   }
 
-  public synchronized boolean readLock() {
-    if (cacheFile.exists()) {
+  /**
+   * Try to get the read lock, return false when this cache value should be deleted or has been
+   * deleted.
+   */
+  public synchronized boolean tryReadLock() {
+    if (shouldDelete || !cacheFile.exists()) {
+      return false;
+    } else {
       this.readCnt++;
       return true;
-    } else {
-      return false;
     }
   }
 
+  /**
+   * Release the read lock, delete the cache value when no one else is reading it and this cache
+   * value should be deleted.
+   */
   public synchronized void readUnlock() {
     this.readCnt--;
     if (shouldDelete && readCnt == 0) {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
index 553a1badd8..226b13bfd5 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
@@ -19,5 +19,7 @@
 package org.apache.iotdb.os.utils;
 
 public class ObjectStorageConstant {
-  public static String FILE_SEPARATOR = "/";
+  public static final String FILE_SEPARATOR = "/";
+  public static final String CACHE_FILE_SUFFIX = ".cache";
+  public static final String TMP_CACHE_FILE_SUFFIX = ".cache.tmp";
 }