You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/10/23 09:28:29 UTC

[iotdb] 01/01: Pipe: introduce wal entry write cache for realtime log mode

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch refactor-wal-entry-cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 764f57f32cee6e3e36d16b95b2aa87023634487b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Oct 23 17:28:19 2023 +0800

    Pipe: introduce wal entry write cache for realtime log mode
---
 .../storageengine/dataregion/wal/node/WALNode.java |   3 +-
 .../dataregion/wal/utils/WALEntryHandler.java      |  44 +++--
 .../dataregion/wal/utils/WALEntryPosition.java     |  48 +++---
 .../dataregion/wal/utils/WALInsertNodeCache.java   | 180 ++++++++++++++-------
 4 files changed, 169 insertions(+), 106 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 0496ad44215..55a6f11cd5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -147,8 +147,7 @@ public class WALNode implements IWALNode {
   private WALFlushListener log(WALEntry walEntry) {
     buffer.write(walEntry);
     // set handler for pipe
-    walEntry.getWalFlushListener().getWalEntryHandler().setMemTableId(walEntry.getMemTableId());
-    walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this);
+    walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this, walEntry.getMemTableId());
     return walEntry.getWalFlushListener();
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index d364ceb3b5e..3097e12e4fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -83,6 +83,12 @@ public class WALEntryHandler {
     walNode.unpinMemTable(memTableId);
   }
 
+  public InsertNode getInsertNodeViaCacheIfPossible() {
+    return value instanceof InsertNode
+        ? (InsertNode) value
+        : walEntryPosition.readByteBufferOrInsertNodeViaCacheDirectly().getRight();
+  }
+
   /**
    * Get this handler's value.
    *
@@ -111,7 +117,7 @@ public class WALEntryHandler {
       }
     }
 
-    final InsertNode node = isHardlink ? readFromHardlinkFile() : readFromOriginalWALFile();
+    final InsertNode node = isHardlink ? readFromHardlinkWALFile() : readFromOriginalWALFile();
     if (node == null) {
       throw new WALPipeException(
           String.format("Fail to get the wal value of the position %s.", walEntryPosition));
@@ -119,10 +125,6 @@ public class WALEntryHandler {
     return node;
   }
 
-  public InsertNode getInsertNodeViaCacheIfPossible() {
-    return value instanceof InsertNode ? (InsertNode) value : null;
-  }
-
   public ByteBuffer getByteBuffer() throws WALPipeException {
     // wait until the position is ready
     while (!walEntryPosition.canRead()) {
@@ -146,15 +148,15 @@ public class WALEntryHandler {
 
   private InsertNode readFromOriginalWALFile() throws WALPipeException {
     try {
-      return walEntryPosition.readInsertNodeViaCache();
+      return walEntryPosition.readInsertNodeViaCacheAfterCanRead();
     } catch (Exception e) {
       throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
     }
   }
 
-  private InsertNode readFromHardlinkFile() throws WALPipeException {
+  private InsertNode readFromHardlinkWALFile() throws WALPipeException {
     try {
-      return walEntryPosition.readInsertNodeViaCache();
+      return walEntryPosition.readInsertNodeViaCacheAfterCanRead();
     } catch (Exception e) {
       throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
     }
@@ -162,38 +164,34 @@ public class WALEntryHandler {
 
   private ByteBuffer readByteBufferFromWALFile() throws WALPipeException {
     try {
-      return walEntryPosition.readByteBufferViaCache();
+      return walEntryPosition.readByteBufferViaCacheAfterCanRead();
     } catch (Exception e) {
       throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
     }
   }
 
-  public long getMemTableId() {
-    return memTableId;
-  }
-
-  public void setMemTableId(long memTableId) {
-    this.memTableId = memTableId;
-  }
-
-  public void setWalNode(WALNode walNode) {
+  public void setWalNode(WALNode walNode, long memTableId) {
     this.walNode = walNode;
-    this.walEntryPosition.setWalNode(walNode);
-    this.walEntryPosition.setWalInsertNodeCache(walNode.getRegionId(memTableId));
+    this.memTableId = memTableId;
+    walEntryPosition.setWalNode(walNode, memTableId);
   }
 
-  public WALEntryPosition getWalEntryPosition() {
-    return walEntryPosition;
+  public long getMemTableId() {
+    return memTableId;
   }
 
   public void setEntryPosition(long walFileVersionId, long position) {
-    this.walEntryPosition.setEntryPosition(walFileVersionId, position);
+    this.walEntryPosition.setEntryPosition(walFileVersionId, position, value);
     this.value = null;
     synchronized (this) {
       this.notifyAll();
     }
   }
 
+  public WALEntryPosition getWalEntryPosition() {
+    return walEntryPosition;
+  }
+
   public int getSize() {
     return walEntryPosition.getSize();
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 05af71c5acc..3979c49bffc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.File;
 import java.io.IOException;
@@ -54,12 +56,20 @@ public class WALEntryPosition {
     this.size = size;
   }
 
+  /**
+   * Try to read the wal entry directly from the cache. No need to check if the wal entry is ready
+   * for read.
+   */
+  public Pair<ByteBuffer, InsertNode> readByteBufferOrInsertNodeViaCacheDirectly() {
+    return cache.getByteBufferOrInsertNode(this);
+  }
+
   /**
    * Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read.
    *
    * @throws IOException failing to read.
    */
-  public InsertNode readInsertNodeViaCache() throws IOException {
+  public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
     if (!canRead()) {
       throw new IOException("This entry isn't ready for read.");
     }
@@ -71,7 +81,7 @@ public class WALEntryPosition {
    *
    * @throws IOException failing to read.
    */
-  public ByteBuffer readByteBufferViaCache() throws IOException {
+  public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException {
     if (!canRead()) {
       throw new IOException("This entry isn't ready for read.");
     }
@@ -122,6 +132,10 @@ public class WALEntryPosition {
     }
   }
 
+  public File getWalFile() {
+    return walFile;
+  }
+
   /** Return true only when the tuple(file, position, size) is ready. */
   public boolean canRead() {
     return walFileVersionId >= 0;
@@ -135,36 +149,32 @@ public class WALEntryPosition {
     return walFileVersionId < walNode.getCurrentWALFileVersion();
   }
 
-  public void setWalNode(WALNode walNode) {
+  public void setWalNode(WALNode walNode, long memTableId) {
     this.walNode = walNode;
-    this.identifier = walNode.getIdentifier();
-  }
-
-  public void setEntryPosition(long walFileVersionId, long position) {
-    this.position = position;
-    this.walFileVersionId = walFileVersionId;
-  }
-
-  public void setWalInsertNodeCache(int regionId) {
-    cache = WALInsertNodeCache.getInstance(regionId);
+    identifier = walNode.getIdentifier();
+    cache = WALInsertNodeCache.getInstance(walNode.getRegionId(memTableId));
   }
 
   public String getIdentifier() {
     return identifier;
   }
 
-  public long getWalFileVersionId() {
-    return walFileVersionId;
-  }
-
-  public File getWalFile() {
-    return walFile;
+  public void setEntryPosition(long walFileVersionId, long position, WALEntryValue value) {
+    this.position = position;
+    this.walFileVersionId = walFileVersionId;
+    if (cache != null && value instanceof InsertNode) {
+      cache.cacheInsertNodeIfNeeded(this, (InsertNode) value);
+    }
   }
 
   public long getPosition() {
     return position;
   }
 
+  public long getWalFileVersionId() {
+    return walFileVersionId;
+  }
+
   public void setSize(int size) {
     this.size = size;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 747c7ac1477..31f5694d0e4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -49,8 +49,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /** This cache is used by {@link WALEntryPosition}. */
 public class WALInsertNodeCache {
-  private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
-  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger LOGGER = LoggerFactory.getLogger(WALInsertNodeCache.class);
+  private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
   // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
@@ -59,60 +59,53 @@ public class WALInsertNodeCache {
   // ids of all pinned memTables
   private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
 
+  private volatile boolean hasPipeRunning = false;
+
   private WALInsertNodeCache() {
     // TODO: try allocate memory 2 * config.getWalFileSizeThresholdInByte() for the cache
     // If allocate memory failed, disable batch load
     isBatchLoadEnabled = true;
     lruCache =
         Caffeine.newBuilder()
-            .maximumWeight(2 * config.getWalFileSizeThresholdInByte())
+            .maximumWeight(2 * CONFIG.getWalFileSizeThresholdInByte())
             .weigher(
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
                     (position, pair) -> position.getSize())
             .build(new WALInsertNodeCacheLoader());
   }
 
-  @TestOnly
-  public boolean isBatchLoadEnabled() {
-    return isBatchLoadEnabled;
-  }
-
-  @TestOnly
-  public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
-    this.isBatchLoadEnabled = isBatchLoadEnabled;
-  }
+  /////////////////////////// Getter & Setter ///////////////////////////
 
   public InsertNode getInsertNode(WALEntryPosition position) {
-    final Pair<ByteBuffer, InsertNode> pair =
-        isBatchLoadEnabled
-            ? lruCache.getAll(Collections.singleton(position)).get(position)
-            : lruCache.get(position);
+    final Pair<ByteBuffer, InsertNode> pair = getByteBufferOrInsertNode(position);
 
-    if (pair == null) {
-      throw new IllegalStateException();
+    if (pair.getRight() != null) {
+      return pair.getRight();
     }
 
-    if (pair.getRight() == null) {
-      try {
-        // multi pipes may share the same wal entry, so we need to wrap the byte[] into
-        // different ByteBuffer for each pipe
-        pair.setRight(parse(ByteBuffer.wrap(pair.getLeft().array())));
-      } catch (Exception e) {
-        logger.error(
-            "Parsing failed when recovering insertNode from wal, walFile:{}, position:{}, size:{}, exception:",
-            position.getWalFile(),
-            position.getPosition(),
-            position.getSize(),
-            e);
-        throw e;
-      }
+    if (pair.getLeft() == null) {
+      throw new IllegalStateException();
     }
 
-    return pair.getRight();
+    try {
+      // multi pipes may share the same wal entry, so we need to wrap the byte[] into
+      // different ByteBuffer for each pipe
+      final InsertNode insertNode = parse(ByteBuffer.wrap(pair.getLeft().array()));
+      pair.setRight(insertNode);
+      return insertNode;
+    } catch (Exception e) {
+      LOGGER.error(
+          "Parsing failed when recovering insertNode from wal, walFile:{}, position:{}, size:{}, exception:",
+          position.getWalFile(),
+          position.getPosition(),
+          position.getSize(),
+          e);
+      throw e;
+    }
   }
 
   private InsertNode parse(ByteBuffer buffer) {
-    PlanNode node = WALEntry.deserializeForConsensus(buffer);
+    final PlanNode node = WALEntry.deserializeForConsensus(buffer);
     if (node instanceof InsertNode) {
       return (InsertNode) node;
     } else {
@@ -121,6 +114,30 @@ public class WALInsertNodeCache {
   }
 
   public ByteBuffer getByteBuffer(WALEntryPosition position) {
+    Pair<ByteBuffer, InsertNode> pair = getByteBufferOrInsertNode(position);
+
+    if (pair.getLeft() != null) {
+      // multi pipes may share the same wal entry, so we need to wrap the byte[] into
+      // different ByteBuffer for each pipe
+      return ByteBuffer.wrap(pair.getLeft().array());
+    }
+
+    // forbid multi threads to invalidate and load the same entry
+    synchronized (this) {
+      lruCache.invalidate(position);
+      pair = getByteBufferOrInsertNode(position);
+    }
+
+    if (pair.getLeft() == null) {
+      throw new IllegalStateException();
+    }
+
+    return ByteBuffer.wrap(pair.getLeft().array());
+  }
+
+  public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(WALEntryPosition position) {
+    hasPipeRunning = true;
+
     final Pair<ByteBuffer, InsertNode> pair =
         isBatchLoadEnabled
             ? lruCache.getAll(Collections.singleton(position)).get(position)
@@ -130,15 +147,18 @@ public class WALInsertNodeCache {
       throw new IllegalStateException();
     }
 
-    // multi pipes may share the same wal entry, so we need to wrap the byte[] into different
-    // ByteBuffer for each pipe
-    return ByteBuffer.wrap(pair.getLeft().array());
+    return pair;
   }
 
-  boolean contains(WALEntryPosition position) {
-    return lruCache.getIfPresent(position) != null;
+  public void cacheInsertNodeIfNeeded(WALEntryPosition walEntryPosition, InsertNode insertNode) {
+    // reduce memory usage
+    if (hasPipeRunning) {
+      lruCache.put(walEntryPosition, new Pair<>(null, insertNode));
+    }
   }
 
+  /////////////////////////// MemTable ///////////////////////////
+
   public void addMemTable(long memTableId) {
     memTablesNeedSearch.add(memTableId);
   }
@@ -147,10 +167,7 @@ public class WALInsertNodeCache {
     memTablesNeedSearch.remove(memTableId);
   }
 
-  public void clear() {
-    lruCache.invalidateAll();
-    memTablesNeedSearch.clear();
-  }
+  /////////////////////////// Cache Loader ///////////////////////////
 
   class WALInsertNodeCacheLoader
       implements CacheLoader<WALEntryPosition, Pair<ByteBuffer, InsertNode>> {
@@ -164,20 +181,22 @@ public class WALInsertNodeCache {
     /** Batch load all wal entries in the file when any one key is absent. */
     @Override
     public @NonNull Map<@NonNull WALEntryPosition, @NonNull Pair<ByteBuffer, InsertNode>> loadAll(
-        @NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
-      final Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> res = new HashMap<>();
+        @NonNull Iterable<? extends @NonNull WALEntryPosition> walEntryPositions) {
+      final Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> loadedEntries = new HashMap<>();
 
-      for (WALEntryPosition pos : keys) {
-        if (res.containsKey(pos) || !pos.canRead()) {
+      for (WALEntryPosition walEntryPosition : walEntryPositions) {
+        if (loadedEntries.containsKey(walEntryPosition) || !walEntryPosition.canRead()) {
           continue;
         }
-        long walFileVersionId = pos.getWalFileVersionId();
+
+        final long walFileVersionId = walEntryPosition.getWalFileVersionId();
+
         // load one when wal file is not sealed
-        if (!pos.isInSealedFile()) {
+        if (!walEntryPosition.isInSealedFile()) {
           try {
-            res.put(pos, load(pos));
+            loadedEntries.put(walEntryPosition, load(walEntryPosition));
           } catch (Exception e) {
-            logger.info(
+            LOGGER.info(
                 "Fail to cache wal entries from the wal file with version id {}",
                 walFileVersionId,
                 e);
@@ -187,43 +206,80 @@ public class WALInsertNodeCache {
 
         // batch load when wal file is sealed
         long position = 0;
-        try (FileChannel channel = pos.openReadFileChannel();
-            WALByteBufReader walByteBufReader = new WALByteBufReader(pos.getWalFile(), channel)) {
+        try (final FileChannel channel = walEntryPosition.openReadFileChannel();
+            final WALByteBufReader walByteBufReader =
+                new WALByteBufReader(walEntryPosition.getWalFile(), channel)) {
           while (walByteBufReader.hasNext()) {
             // see WALInfoEntry#serialize, entry type + memtable id + plan node type
-            ByteBuffer buffer = walByteBufReader.next();
-            int size = buffer.capacity();
-            WALEntryType type = WALEntryType.valueOf(buffer.get());
-            long memTableId = buffer.getLong();
-            if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
+            final ByteBuffer buffer = walByteBufReader.next();
+
+            final int size = buffer.capacity();
+            final WALEntryType type = WALEntryType.valueOf(buffer.get());
+            final long memTableId = buffer.getLong();
+
+            if ((memTablesNeedSearch.contains(memTableId)
+                    || walEntryPosition.getPosition() == position)
                 && type.needSearch()) {
               buffer.clear();
-              res.put(
-                  new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
+              loadedEntries.put(
+                  new WALEntryPosition(
+                      walEntryPosition.getIdentifier(), walFileVersionId, position, size),
                   new Pair<>(buffer, null));
             }
+
             position += size;
           }
         } catch (IOException e) {
-          logger.info(
+          LOGGER.info(
               "Fail to cache wal entries from the wal file with version id {}",
               walFileVersionId,
               e);
         }
       }
-      return res;
+
+      return loadedEntries;
     }
   }
 
+  /////////////////////////// Singleton ///////////////////////////
+
   public static WALInsertNodeCache getInstance(Integer regionId) {
     return InstanceHolder.getOrCreateInstance(regionId);
   }
 
   private static class InstanceHolder {
+
     private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new ConcurrentHashMap<>();
 
     public static WALInsertNodeCache getOrCreateInstance(Integer key) {
       return INSTANCE_MAP.computeIfAbsent(key, k -> new WALInsertNodeCache());
     }
+
+    private InstanceHolder() {
+      // forbidding instantiation
+    }
+  }
+
+  /////////////////////////// Test Only ///////////////////////////
+
+  @TestOnly
+  public boolean isBatchLoadEnabled() {
+    return isBatchLoadEnabled;
+  }
+
+  @TestOnly
+  public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
+    this.isBatchLoadEnabled = isBatchLoadEnabled;
+  }
+
+  @TestOnly
+  boolean contains(WALEntryPosition position) {
+    return lruCache.getIfPresent(position) != null;
+  }
+
+  @TestOnly
+  public void clear() {
+    lruCache.invalidateAll();
+    memTablesNeedSearch.clear();
   }
 }