You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2023/10/23 09:28:29 UTC
[iotdb] 01/01: Pipe: introduce wal entry write cache for realtime log mode
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch refactor-wal-entry-cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 764f57f32cee6e3e36d16b95b2aa87023634487b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Oct 23 17:28:19 2023 +0800
Pipe: introduce wal entry write cache for realtime log mode
---
.../storageengine/dataregion/wal/node/WALNode.java | 3 +-
.../dataregion/wal/utils/WALEntryHandler.java | 44 +++--
.../dataregion/wal/utils/WALEntryPosition.java | 48 +++---
.../dataregion/wal/utils/WALInsertNodeCache.java | 180 ++++++++++++++-------
4 files changed, 169 insertions(+), 106 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 0496ad44215..55a6f11cd5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -147,8 +147,7 @@ public class WALNode implements IWALNode {
private WALFlushListener log(WALEntry walEntry) {
buffer.write(walEntry);
// set handler for pipe
- walEntry.getWalFlushListener().getWalEntryHandler().setMemTableId(walEntry.getMemTableId());
- walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this);
+ walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this, walEntry.getMemTableId());
return walEntry.getWalFlushListener();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index d364ceb3b5e..3097e12e4fc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -83,6 +83,12 @@ public class WALEntryHandler {
walNode.unpinMemTable(memTableId);
}
+ public InsertNode getInsertNodeViaCacheIfPossible() {
+ return value instanceof InsertNode
+ ? (InsertNode) value
+ : walEntryPosition.readByteBufferOrInsertNodeViaCacheDirectly().getRight();
+ }
+
/**
* Get this handler's value.
*
@@ -111,7 +117,7 @@ public class WALEntryHandler {
}
}
- final InsertNode node = isHardlink ? readFromHardlinkFile() : readFromOriginalWALFile();
+ final InsertNode node = isHardlink ? readFromHardlinkWALFile() : readFromOriginalWALFile();
if (node == null) {
throw new WALPipeException(
String.format("Fail to get the wal value of the position %s.", walEntryPosition));
@@ -119,10 +125,6 @@ public class WALEntryHandler {
return node;
}
- public InsertNode getInsertNodeViaCacheIfPossible() {
- return value instanceof InsertNode ? (InsertNode) value : null;
- }
-
public ByteBuffer getByteBuffer() throws WALPipeException {
// wait until the position is ready
while (!walEntryPosition.canRead()) {
@@ -146,15 +148,15 @@ public class WALEntryHandler {
private InsertNode readFromOriginalWALFile() throws WALPipeException {
try {
- return walEntryPosition.readInsertNodeViaCache();
+ return walEntryPosition.readInsertNodeViaCacheAfterCanRead();
} catch (Exception e) {
throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
}
}
- private InsertNode readFromHardlinkFile() throws WALPipeException {
+ private InsertNode readFromHardlinkWALFile() throws WALPipeException {
try {
- return walEntryPosition.readInsertNodeViaCache();
+ return walEntryPosition.readInsertNodeViaCacheAfterCanRead();
} catch (Exception e) {
throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
}
@@ -162,38 +164,34 @@ public class WALEntryHandler {
private ByteBuffer readByteBufferFromWALFile() throws WALPipeException {
try {
- return walEntryPosition.readByteBufferViaCache();
+ return walEntryPosition.readByteBufferViaCacheAfterCanRead();
} catch (Exception e) {
throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
}
}
- public long getMemTableId() {
- return memTableId;
- }
-
- public void setMemTableId(long memTableId) {
- this.memTableId = memTableId;
- }
-
- public void setWalNode(WALNode walNode) {
+ public void setWalNode(WALNode walNode, long memTableId) {
this.walNode = walNode;
- this.walEntryPosition.setWalNode(walNode);
- this.walEntryPosition.setWalInsertNodeCache(walNode.getRegionId(memTableId));
+ this.memTableId = memTableId;
+ walEntryPosition.setWalNode(walNode, memTableId);
}
- public WALEntryPosition getWalEntryPosition() {
- return walEntryPosition;
+ public long getMemTableId() {
+ return memTableId;
}
public void setEntryPosition(long walFileVersionId, long position) {
- this.walEntryPosition.setEntryPosition(walFileVersionId, position);
+ this.walEntryPosition.setEntryPosition(walFileVersionId, position, value);
this.value = null;
synchronized (this) {
this.notifyAll();
}
}
+ public WALEntryPosition getWalEntryPosition() {
+ return walEntryPosition;
+ }
+
public int getSize() {
return walEntryPosition.getSize();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 05af71c5acc..3979c49bffc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.File;
import java.io.IOException;
@@ -54,12 +56,20 @@ public class WALEntryPosition {
this.size = size;
}
+ /**
+ * Try to read the wal entry directly from the cache. No need to check if the wal entry is ready
+ * for read.
+ */
+ public Pair<ByteBuffer, InsertNode> readByteBufferOrInsertNodeViaCacheDirectly() {
+ return cache.getByteBufferOrInsertNode(this);
+ }
+
/**
* Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read.
*
* @throws IOException failing to read.
*/
- public InsertNode readInsertNodeViaCache() throws IOException {
+ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
}
@@ -71,7 +81,7 @@ public class WALEntryPosition {
*
* @throws IOException failing to read.
*/
- public ByteBuffer readByteBufferViaCache() throws IOException {
+ public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
}
@@ -122,6 +132,10 @@ public class WALEntryPosition {
}
}
+ public File getWalFile() {
+ return walFile;
+ }
+
/** Return true only when the tuple(file, position, size) is ready. */
public boolean canRead() {
return walFileVersionId >= 0;
@@ -135,36 +149,32 @@ public class WALEntryPosition {
return walFileVersionId < walNode.getCurrentWALFileVersion();
}
- public void setWalNode(WALNode walNode) {
+ public void setWalNode(WALNode walNode, long memTableId) {
this.walNode = walNode;
- this.identifier = walNode.getIdentifier();
- }
-
- public void setEntryPosition(long walFileVersionId, long position) {
- this.position = position;
- this.walFileVersionId = walFileVersionId;
- }
-
- public void setWalInsertNodeCache(int regionId) {
- cache = WALInsertNodeCache.getInstance(regionId);
+ identifier = walNode.getIdentifier();
+ cache = WALInsertNodeCache.getInstance(walNode.getRegionId(memTableId));
}
public String getIdentifier() {
return identifier;
}
- public long getWalFileVersionId() {
- return walFileVersionId;
- }
-
- public File getWalFile() {
- return walFile;
+ public void setEntryPosition(long walFileVersionId, long position, WALEntryValue value) {
+ this.position = position;
+ this.walFileVersionId = walFileVersionId;
+ if (cache != null && value instanceof InsertNode) {
+ cache.cacheInsertNodeIfNeeded(this, (InsertNode) value);
+ }
}
public long getPosition() {
return position;
}
+ public long getWalFileVersionId() {
+ return walFileVersionId;
+ }
+
public void setSize(int size) {
this.size = size;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 747c7ac1477..31f5694d0e4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -49,8 +49,8 @@ import java.util.concurrent.ConcurrentHashMap;
/** This cache is used by {@link WALEntryPosition}. */
public class WALInsertNodeCache {
- private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
- private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final Logger LOGGER = LoggerFactory.getLogger(WALInsertNodeCache.class);
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> lruCache;
@@ -59,60 +59,53 @@ public class WALInsertNodeCache {
// ids of all pinned memTables
private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();
+ private volatile boolean hasPipeRunning = false;
+
private WALInsertNodeCache() {
// TODO: try allocate memory 2 * config.getWalFileSizeThresholdInByte() for the cache
// If allocate memory failed, disable batch load
isBatchLoadEnabled = true;
lruCache =
Caffeine.newBuilder()
- .maximumWeight(2 * config.getWalFileSizeThresholdInByte())
+ .maximumWeight(2 * CONFIG.getWalFileSizeThresholdInByte())
.weigher(
(Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
(position, pair) -> position.getSize())
.build(new WALInsertNodeCacheLoader());
}
- @TestOnly
- public boolean isBatchLoadEnabled() {
- return isBatchLoadEnabled;
- }
-
- @TestOnly
- public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
- this.isBatchLoadEnabled = isBatchLoadEnabled;
- }
+ /////////////////////////// Getter & Setter ///////////////////////////
public InsertNode getInsertNode(WALEntryPosition position) {
- final Pair<ByteBuffer, InsertNode> pair =
- isBatchLoadEnabled
- ? lruCache.getAll(Collections.singleton(position)).get(position)
- : lruCache.get(position);
+ final Pair<ByteBuffer, InsertNode> pair = getByteBufferOrInsertNode(position);
- if (pair == null) {
- throw new IllegalStateException();
+ if (pair.getRight() != null) {
+ return pair.getRight();
}
- if (pair.getRight() == null) {
- try {
- // multi pipes may share the same wal entry, so we need to wrap the byte[] into
- // different ByteBuffer for each pipe
- pair.setRight(parse(ByteBuffer.wrap(pair.getLeft().array())));
- } catch (Exception e) {
- logger.error(
- "Parsing failed when recovering insertNode from wal, walFile:{}, position:{}, size:{}, exception:",
- position.getWalFile(),
- position.getPosition(),
- position.getSize(),
- e);
- throw e;
- }
+ if (pair.getLeft() == null) {
+ throw new IllegalStateException();
}
- return pair.getRight();
+ try {
+ // multi pipes may share the same wal entry, so we need to wrap the byte[] into
+ // different ByteBuffer for each pipe
+ final InsertNode insertNode = parse(ByteBuffer.wrap(pair.getLeft().array()));
+ pair.setRight(insertNode);
+ return insertNode;
+ } catch (Exception e) {
+ LOGGER.error(
+ "Parsing failed when recovering insertNode from wal, walFile:{}, position:{}, size:{}, exception:",
+ position.getWalFile(),
+ position.getPosition(),
+ position.getSize(),
+ e);
+ throw e;
+ }
}
private InsertNode parse(ByteBuffer buffer) {
- PlanNode node = WALEntry.deserializeForConsensus(buffer);
+ final PlanNode node = WALEntry.deserializeForConsensus(buffer);
if (node instanceof InsertNode) {
return (InsertNode) node;
} else {
@@ -121,6 +114,30 @@ public class WALInsertNodeCache {
}
public ByteBuffer getByteBuffer(WALEntryPosition position) {
+ Pair<ByteBuffer, InsertNode> pair = getByteBufferOrInsertNode(position);
+
+ if (pair.getLeft() != null) {
+ // multi pipes may share the same wal entry, so we need to wrap the byte[] into
+ // different ByteBuffer for each pipe
+ return ByteBuffer.wrap(pair.getLeft().array());
+ }
+
+ // forbid multi threads to invalidate and load the same entry
+ synchronized (this) {
+ lruCache.invalidate(position);
+ pair = getByteBufferOrInsertNode(position);
+ }
+
+ if (pair.getLeft() == null) {
+ throw new IllegalStateException();
+ }
+
+ return ByteBuffer.wrap(pair.getLeft().array());
+ }
+
+ public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(WALEntryPosition position) {
+ hasPipeRunning = true;
+
final Pair<ByteBuffer, InsertNode> pair =
isBatchLoadEnabled
? lruCache.getAll(Collections.singleton(position)).get(position)
@@ -130,15 +147,18 @@ public class WALInsertNodeCache {
throw new IllegalStateException();
}
- // multi pipes may share the same wal entry, so we need to wrap the byte[] into different
- // ByteBuffer for each pipe
- return ByteBuffer.wrap(pair.getLeft().array());
+ return pair;
}
- boolean contains(WALEntryPosition position) {
- return lruCache.getIfPresent(position) != null;
+ public void cacheInsertNodeIfNeeded(WALEntryPosition walEntryPosition, InsertNode insertNode) {
+ // reduce memory usage
+ if (hasPipeRunning) {
+ lruCache.put(walEntryPosition, new Pair<>(null, insertNode));
+ }
}
+ /////////////////////////// MemTable ///////////////////////////
+
public void addMemTable(long memTableId) {
memTablesNeedSearch.add(memTableId);
}
@@ -147,10 +167,7 @@ public class WALInsertNodeCache {
memTablesNeedSearch.remove(memTableId);
}
- public void clear() {
- lruCache.invalidateAll();
- memTablesNeedSearch.clear();
- }
+ /////////////////////////// Cache Loader ///////////////////////////
class WALInsertNodeCacheLoader
implements CacheLoader<WALEntryPosition, Pair<ByteBuffer, InsertNode>> {
@@ -164,20 +181,22 @@ public class WALInsertNodeCache {
/** Batch load all wal entries in the file when any one key is absent. */
@Override
public @NonNull Map<@NonNull WALEntryPosition, @NonNull Pair<ByteBuffer, InsertNode>> loadAll(
- @NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
- final Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> res = new HashMap<>();
+ @NonNull Iterable<? extends @NonNull WALEntryPosition> walEntryPositions) {
+ final Map<WALEntryPosition, Pair<ByteBuffer, InsertNode>> loadedEntries = new HashMap<>();
- for (WALEntryPosition pos : keys) {
- if (res.containsKey(pos) || !pos.canRead()) {
+ for (WALEntryPosition walEntryPosition : walEntryPositions) {
+ if (loadedEntries.containsKey(walEntryPosition) || !walEntryPosition.canRead()) {
continue;
}
- long walFileVersionId = pos.getWalFileVersionId();
+
+ final long walFileVersionId = walEntryPosition.getWalFileVersionId();
+
// load one when wal file is not sealed
- if (!pos.isInSealedFile()) {
+ if (!walEntryPosition.isInSealedFile()) {
try {
- res.put(pos, load(pos));
+ loadedEntries.put(walEntryPosition, load(walEntryPosition));
} catch (Exception e) {
- logger.info(
+ LOGGER.info(
"Fail to cache wal entries from the wal file with version id {}",
walFileVersionId,
e);
@@ -187,43 +206,80 @@ public class WALInsertNodeCache {
// batch load when wal file is sealed
long position = 0;
- try (FileChannel channel = pos.openReadFileChannel();
- WALByteBufReader walByteBufReader = new WALByteBufReader(pos.getWalFile(), channel)) {
+ try (final FileChannel channel = walEntryPosition.openReadFileChannel();
+ final WALByteBufReader walByteBufReader =
+ new WALByteBufReader(walEntryPosition.getWalFile(), channel)) {
while (walByteBufReader.hasNext()) {
// see WALInfoEntry#serialize, entry type + memtable id + plan node type
- ByteBuffer buffer = walByteBufReader.next();
- int size = buffer.capacity();
- WALEntryType type = WALEntryType.valueOf(buffer.get());
- long memTableId = buffer.getLong();
- if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
+ final ByteBuffer buffer = walByteBufReader.next();
+
+ final int size = buffer.capacity();
+ final WALEntryType type = WALEntryType.valueOf(buffer.get());
+ final long memTableId = buffer.getLong();
+
+ if ((memTablesNeedSearch.contains(memTableId)
+ || walEntryPosition.getPosition() == position)
&& type.needSearch()) {
buffer.clear();
- res.put(
- new WALEntryPosition(pos.getIdentifier(), walFileVersionId, position, size),
+ loadedEntries.put(
+ new WALEntryPosition(
+ walEntryPosition.getIdentifier(), walFileVersionId, position, size),
new Pair<>(buffer, null));
}
+
position += size;
}
} catch (IOException e) {
- logger.info(
+ LOGGER.info(
"Fail to cache wal entries from the wal file with version id {}",
walFileVersionId,
e);
}
}
- return res;
+
+ return loadedEntries;
}
}
+ /////////////////////////// Singleton ///////////////////////////
+
public static WALInsertNodeCache getInstance(Integer regionId) {
return InstanceHolder.getOrCreateInstance(regionId);
}
private static class InstanceHolder {
+
private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new ConcurrentHashMap<>();
public static WALInsertNodeCache getOrCreateInstance(Integer key) {
return INSTANCE_MAP.computeIfAbsent(key, k -> new WALInsertNodeCache());
}
+
+ private InstanceHolder() {
+ // forbidding instantiation
+ }
+ }
+
+ /////////////////////////// Test Only ///////////////////////////
+
+ @TestOnly
+ public boolean isBatchLoadEnabled() {
+ return isBatchLoadEnabled;
+ }
+
+ @TestOnly
+ public void setIsBatchLoadEnabled(boolean isBatchLoadEnabled) {
+ this.isBatchLoadEnabled = isBatchLoadEnabled;
+ }
+
+ @TestOnly
+ boolean contains(WALEntryPosition position) {
+ return lruCache.getIfPresent(position) != null;
+ }
+
+ @TestOnly
+ public void clear() {
+ lruCache.invalidateAll();
+ memTablesNeedSearch.clear();
}
}