You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/15 13:03:35 UTC
[iotdb] branch master updated: [IOTDB-5823] wal pipe handler for the pipe module (#9708)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c9fe5c44637 [IOTDB-5823] wal pipe handler for the pipe module (#9708)
c9fe5c44637 is described below
commit c9fe5c44637b5084f984c7a14c5a4168341b989a
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon May 15 21:03:26 2023 +0800
[IOTDB-5823] wal pipe handler for the pipe module (#9708)
---
.../consensus/iot/wal/ConsensusReqReader.java | 3 +
.../consensus/iot/util/FakeConsensusReqReader.java | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 +-
.../iotdb/db/wal/buffer/AbstractWALBuffer.java | 26 ++-
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 32 +--
.../org/apache/iotdb/db/wal/buffer/WALEntry.java | 4 +-
.../iotdb/db/wal/checkpoint/CheckpointManager.java | 118 +++++++---
.../iotdb/db/wal/checkpoint/MemTableInfo.java | 28 ++-
.../MemTablePinException.java} | 17 +-
.../WALPipeException.java} | 17 +-
.../apache/iotdb/db/wal/io/WALByteBufReader.java | 6 +-
.../org/apache/iotdb/db/wal/node/WALFakeNode.java | 24 +-
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 57 ++++-
.../iotdb/db/wal/utils/WALEntryPosition.java | 157 +++++++++++++
.../apache/iotdb/db/wal/utils/WALFileUtils.java | 11 +
.../iotdb/db/wal/utils/WALInsertNodeCache.java | 170 ++++++++++++++
.../apache/iotdb/db/wal/utils/WALPipeHandler.java | 132 +++++++++++
.../db/wal/utils/listener/WALFlushListener.java | 13 +-
.../org/apache/iotdb/db/wal/node/WALNodeTest.java | 7 +-
.../iotdb/db/wal/node/WALPipeHandlerTest.java | 256 +++++++++++++++++++++
.../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 167 ++++++++++++++
21 files changed, 1176 insertions(+), 95 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
index 0fedc0ed085..f7bc2cacad0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/wal/ConsensusReqReader.java
@@ -80,6 +80,9 @@ public interface ConsensusReqReader {
/** Get current search index */
long getCurrentSearchIndex();
+ /** Get current wal file version */
+ long getCurrentWALFileVersion();
+
/** Get total size of wal files */
long getTotalSize();
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index 3a8c8e4e8c8..da58725f7b5 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -47,6 +47,11 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
return requestSets.getLocalRequestNumber();
}
+ @Override
+ public long getCurrentWALFileVersion() {
+ return 0;
+ }
+
@Override
public long getTotalSize() {
return 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c7e6c09d633..76fce18d052 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -552,6 +552,10 @@ public class IoTDBConfig {
/** Memory allocated proportion for time partition info */
private long allocateMemoryForTimePartitionInfo = allocateMemoryForStorageEngine * 50 / 1001;
+
+ /** Memory allocated proportion for wal pipe cache */
+ private long allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
+
/**
* If true, we will estimate each query's possible memory footprint before executing it and deny
* it if its estimated memory exceeds current free memory
@@ -1918,10 +1922,6 @@ public class IoTDBConfig {
return allocateMemoryForSchema;
}
- public long getAllocateMemoryForConsensus() {
- return allocateMemoryForConsensus;
- }
-
public void setAllocateMemoryForSchema(long allocateMemoryForSchema) {
this.allocateMemoryForSchema = allocateMemoryForSchema;
@@ -1930,8 +1930,13 @@ public class IoTDBConfig {
this.allocateMemoryForLastCache = allocateMemoryForSchema / 10;
}
+ public long getAllocateMemoryForConsensus() {
+ return allocateMemoryForConsensus;
+ }
+
public void setAllocateMemoryForConsensus(long allocateMemoryForConsensus) {
this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+ this.allocateMemoryForWALPipeCache = allocateMemoryForConsensus / 10;
}
public long getAllocateMemoryForRead() {
@@ -2198,6 +2203,14 @@ public class IoTDBConfig {
this.allocateMemoryForTimePartitionInfo = allocateMemoryForTimePartitionInfo;
}
+ public long getAllocateMemoryForWALPipeCache() {
+ return allocateMemoryForWALPipeCache;
+ }
+
+ public void setAllocateMemoryForWALPipeCache(long allocateMemoryForWALPipeCache) {
+ this.allocateMemoryForWALPipeCache = allocateMemoryForWALPipeCache;
+ }
+
public boolean isEnableQueryMemoryEstimation() {
return enableQueryMemoryEstimation;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 437e34cd5b2..372692ab46d 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -83,22 +83,29 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
return currentWALFileWriter.size();
}
- /** Notice: only called by syncBufferThread and old log writer will be closed by this function. */
- protected void rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
+ /**
+ * Notice: only called by syncBufferThread and old log writer will be closed by this function.
+ *
+ * @return last wal file
+ */
+ protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
// close file
- File currentFile = currentWALFileWriter.getLogFile();
- String currentName = currentFile.getName();
+ File lastFile = currentWALFileWriter.getLogFile();
+ String lastName = lastFile.getName();
currentWALFileWriter.close();
addDiskUsage(currentWALFileWriter.size());
addFileNum(1);
- if (WALFileUtils.parseStatusCode(currentName) != fileStatus) {
+ if (WALFileUtils.parseStatusCode(lastName) != fileStatus) {
String targetName =
WALFileUtils.getLogFileName(
- WALFileUtils.parseVersionId(currentName),
- WALFileUtils.parseStartSearchIndex(currentName),
+ WALFileUtils.parseVersionId(lastName),
+ WALFileUtils.parseStartSearchIndex(lastName),
fileStatus);
- if (!currentFile.renameTo(SystemFileFactory.INSTANCE.getFile(logDirectory, targetName))) {
- logger.error("Fail to rename file {} to {}", currentName, targetName);
+ File targetFile = SystemFileFactory.INSTANCE.getFile(logDirectory, targetName);
+ if (lastFile.renameTo(targetFile)) {
+ lastFile = targetFile;
+ } else {
+ logger.error("Fail to rename file {} to {}", lastName, targetName);
}
}
// roll file
@@ -111,6 +118,7 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
currentWALFileWriter = new WALWriter(nextLogFile);
currentWALFileVersion = nextFileVersion;
logger.debug("Open new wal file {} for wal node-{}'s buffer.", nextLogFile, identifier);
+ return lastFile;
}
public long getDiskUsage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index b1cbfcf963c..4ef2315fc61 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -40,7 +40,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -140,7 +140,7 @@ public class WALBuffer extends AbstractWALBuffer {
/** This info class traverses some extra info from serializeThread to syncBufferThread */
private static class SerializeInfo {
final WALMetaData metaData = new WALMetaData();
- final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+ final List<WALFlushListener> fsyncListeners = new ArrayList<>();
WALFlushListener rollWALFileWriterListener = null;
}
@@ -222,19 +222,12 @@ public class WALBuffer extends AbstractWALBuffer {
return handleSignalEntry((WALSignalEntry) walEntry);
}
- boolean success = handleInfoEntry(walEntry);
- if (success) {
- info.fsyncListeners.add(walEntry.getWalFlushListener());
- }
+ handleInfoEntry(walEntry);
return false;
}
- /**
- * Handle a normal WALEntry.
- *
- * @return true if serialization is successful.
- */
- private boolean handleInfoEntry(WALEntry walEntry) {
+ /** Handle a normal WALEntry. */
+ private void handleInfoEntry(WALEntry walEntry) {
int size = byteBufferView.position();
try {
long start = System.nanoTime();
@@ -245,9 +238,9 @@ public class WALBuffer extends AbstractWALBuffer {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.", identifier, e);
walEntry.getWalFlushListener().fail(e);
- return false;
+ return;
}
- // update search index
+ // parse search index
long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType().needSearch()) {
if (walEntry.getType() == WALEntryType.DELETE_DATA_NODE) {
@@ -260,9 +253,11 @@ public class WALBuffer extends AbstractWALBuffer {
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
+ // update related info
totalSize += size;
info.metaData.add(size, searchIndex);
- return true;
+ walEntry.getWalFlushListener().getWalPipeHandler().setSize(size);
+ info.fsyncListeners.add(walEntry.getWalFlushListener());
}
/**
@@ -439,8 +434,11 @@ public class WALBuffer extends AbstractWALBuffer {
@Override
public void run() {
long start = System.nanoTime();
+ long walFileVersionId = currentWALFileVersion;
+ long position = currentWALFileWriter.size();
currentWALFileWriter.updateFileStatus(fileStatus);
+ // calculate buffer used ratio
double usedRatio = (double) syncingBuffer.position() / syncingBuffer.capacity();
WRITING_METRICS.recordWALBufferUsedRatio(usedRatio);
logger.debug(
@@ -497,6 +495,10 @@ public class WALBuffer extends AbstractWALBuffer {
if (forceSuccess) {
for (WALFlushListener fsyncListener : info.fsyncListeners) {
fsyncListener.succeed();
+ if (fsyncListener.getWalPipeHandler() != null) {
+ fsyncListener.getWalPipeHandler().setEntryPosition(walFileVersionId, position);
+ position += fsyncListener.getWalPipeHandler().getSize();
+ }
}
}
WRITING_METRICS.recordWALBufferEntriesCount(info.fsyncListeners.size());
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
index 64beef1b37b..04781a82b22 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java
@@ -70,14 +70,14 @@ public abstract class WALEntry implements SerializedSize {
} else {
throw new RuntimeException("Unknown WALEntry type");
}
- walFlushListener = new WALFlushListener(wait);
+ walFlushListener = new WALFlushListener(wait, value);
}
protected WALEntry(WALEntryType type, long memTableId, WALEntryValue value, boolean wait) {
this.type = type;
this.memTableId = memTableId;
this.value = value;
- this.walFlushListener = new WALFlushListener(wait);
+ this.walFlushListener = new WALFlushListener(wait, value);
}
public abstract void serialize(IWALByteBufferView buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
index e9bcbf81df1..4870e9588cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
@@ -23,9 +23,11 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.recorder.WritingMetricsManager;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
import org.apache.iotdb.db.wal.io.CheckpointWriter;
import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
+import org.apache.iotdb.db.wal.utils.WALInsertNodeCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +86,15 @@ public class CheckpointManager implements AutoCloseable {
logHeader();
}
+ private List<MemTableInfo> snapshotMemTableInfos() {
+ infoLock.lock();
+ try {
+ return new ArrayList<>(memTableId2Info.values());
+ } finally {
+ infoLock.unlock();
+ }
+ }
+
private void logHeader() {
infoLock.lock();
try {
@@ -108,9 +119,9 @@ public class CheckpointManager implements AutoCloseable {
*/
private void makeGlobalInfoCP() {
long start = System.nanoTime();
- Checkpoint checkpoint =
- new Checkpoint(
- CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new ArrayList<>(memTableId2Info.values()));
+ List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+ memTableInfos.removeIf(MemTableInfo::isFlushed);
+ Checkpoint checkpoint = new Checkpoint(CheckpointType.GLOBAL_MEMORY_TABLE_INFO, memTableInfos);
logByCachedByteBuffer(checkpoint);
WRITING_METRICS.recordMakeCheckpointCost(checkpoint.getType(), System.nanoTime() - start);
}
@@ -138,10 +149,14 @@ public class CheckpointManager implements AutoCloseable {
infoLock.lock();
long start = System.nanoTime();
try {
- MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
+ MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
if (memTableInfo == null) {
return;
}
+ memTableInfo.setFlushed();
+ if (!memTableInfo.isPinned()) {
+ memTableId2Info.remove(memTableId);
+ }
Checkpoint checkpoint =
new Checkpoint(
CheckpointType.FLUSH_MEMORY_TABLE, Collections.singletonList(memTableInfo));
@@ -224,16 +239,69 @@ public class CheckpointManager implements AutoCloseable {
}
// endregion
- /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
- public MemTableInfo getOldestMemTableInfo() {
- // find oldest memTable
- List<MemTableInfo> memTableInfos;
+ // region methods for pipe
+ /**
+ * Pin the wal files of the given memory table. Notice: cannot pin one memTable too long,
+ * otherwise the wal disk usage may too large.
+ *
+ * @throws MemTablePinException If the memTable has been flushed
+ */
+ public void pinMemTable(long memTableId) throws MemTablePinException {
+ infoLock.lock();
+ try {
+ if (!memTableId2Info.containsKey(memTableId)) {
+ throw new MemTablePinException(
+ String.format(
+ "Fail to pin memTable-%d because this memTable doesn't exist in the wal.",
+ memTableId));
+ }
+ MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
+ if (!memTableInfo.isPinned()) {
+ WALInsertNodeCache.getInstance().addMemTable(memTableId);
+ }
+ memTableInfo.pin();
+ } finally {
+ infoLock.unlock();
+ }
+ }
+
+ /**
+ * Unpin the wal files of the given memory table.
+ *
+ * @throws MemTablePinException If there aren't corresponding pin operations
+ */
+ public void unpinMemTable(long memTableId) throws MemTablePinException {
infoLock.lock();
try {
- memTableInfos = new ArrayList<>(memTableId2Info.values());
+ if (!memTableId2Info.containsKey(memTableId)) {
+ throw new MemTablePinException(
+ String.format(
+ "Fail to unpin memTable-%d because this memTable doesn't exist in the wal.",
+ memTableId));
+ }
+ if (!memTableId2Info.get(memTableId).isPinned()) {
+ throw new MemTablePinException(
+ String.format(
+ "Fail to unpin memTable-%d because this memTable hasn't been pinned.", memTableId));
+ }
+ MemTableInfo memTableInfo = memTableId2Info.get(memTableId);
+ memTableInfo.unpin();
+ if (!memTableInfo.isPinned()) {
+ WALInsertNodeCache.getInstance().removeMemTable(memTableId);
+ if (memTableInfo.isFlushed()) {
+ memTableId2Info.remove(memTableId);
+ }
+ }
} finally {
infoLock.unlock();
}
+ }
+ // endregion
+
+ /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
+ public MemTableInfo getOldestMemTableInfo() {
+ // find oldest memTable
+ List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
if (memTableInfos.isEmpty()) {
return null;
}
@@ -252,13 +320,7 @@ public class CheckpointManager implements AutoCloseable {
* @return Return {@link Long#MIN_VALUE} if no file is valid
*/
public long getFirstValidWALVersionId() {
- List<MemTableInfo> memTableInfos;
- infoLock.lock();
- try {
- memTableInfos = new ArrayList<>(memTableId2Info.values());
- } finally {
- infoLock.unlock();
- }
+ List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE : Long.MAX_VALUE;
for (MemTableInfo memTableInfo : memTableInfos) {
firstValidVersionId = Math.min(firstValidVersionId, memTableInfo.getFirstFileVersionId());
@@ -268,25 +330,17 @@ public class CheckpointManager implements AutoCloseable {
/** Get total cost of active memTables */
public long getTotalCostOfActiveMemTables() {
+ List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
long totalCost = 0;
-
- if (!config.isEnableMemControl()) {
- infoLock.lock();
- try {
- totalCost = memTableId2Info.size();
- } finally {
- infoLock.unlock();
- }
- } else {
- List<MemTableInfo> memTableInfos;
- infoLock.lock();
- try {
- memTableInfos = new ArrayList<>(memTableId2Info.values());
- } finally {
- infoLock.unlock();
+ for (MemTableInfo memTableInfo : memTableInfos) {
+ // flushed memTables are not active
+ if (memTableInfo.isFlushed()) {
+ continue;
}
- for (MemTableInfo memTableInfo : memTableInfos) {
+ if (config.isEnableMemControl()) {
totalCost += memTableInfo.getMemTable().getTVListsRamCost();
+ } else {
+ totalCost++;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
index 3de6f7fcdae..bcf1ebe338e 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
import java.util.Objects;
/**
- * MemTableInfo records brief info of one memtable, including memTable id, tsFile path, and .wal
+ * MemTableInfo records brief info of one memTable, including memTable id, tsFile path, and .wal
* file version id of its first {@link WALEntry}.
*/
public class MemTableInfo implements SerializedSize {
@@ -38,6 +38,10 @@ public class MemTableInfo implements SerializedSize {
/** memTable */
private IMemTable memTable;
+ /** memTable pin count */
+ private int pinCount;
+ /** memTable is flushed or not */
+ private boolean flushed;
/** memTable id */
private long memTableId;
/** path of the tsFile which this memTable will be flushed to */
@@ -94,6 +98,28 @@ public class MemTableInfo implements SerializedSize {
return memTable;
}
+ public void pin() {
+ this.pinCount++;
+ }
+
+ public void unpin() {
+ this.pinCount--;
+ }
+
+ public boolean isPinned() {
+ return pinCount > 0;
+ }
+
+ public boolean isFlushed() {
+ return flushed;
+ }
+
+ public void setFlushed() {
+ // avoid memory leak;
+ this.memTable = null;
+ this.flushed = true;
+ }
+
public long getMemTableId() {
return memTableId;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
copy to server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
index 846e47961f9..3e9722d8cc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/MemTablePinException.java
@@ -16,11 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal.utils.listener;
+package org.apache.iotdb.db.wal.exception;
-/** This class helps judge whether wal is flushed to the storage device. */
-public class WALFlushListener extends AbstractResultListener {
- public WALFlushListener(boolean wait) {
- super(wait);
+public class MemTablePinException extends WALException {
+ public MemTablePinException(Throwable cause) {
+ super(cause);
+ }
+
+ public MemTablePinException(String message) {
+ super(message);
+ }
+
+ public MemTablePinException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
similarity index 71%
copy from server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
copy to server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
index 846e47961f9..e596ca3cf93 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/exception/WALPipeException.java
@@ -16,11 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal.utils.listener;
+package org.apache.iotdb.db.wal.exception;
-/** This class helps judge whether wal is flushed to the storage device. */
-public class WALFlushListener extends AbstractResultListener {
- public WALFlushListener(boolean wait) {
- super(wait);
+public class WALPipeException extends WALException {
+ public WALPipeException(Throwable cause) {
+ super(cause);
+ }
+
+ public WALPipeException(String message) {
+ super(message);
+ }
+
+ public WALPipeException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
index d5c02437950..1845bd19fc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALByteBufReader.java
@@ -42,8 +42,12 @@ public class WALByteBufReader implements Closeable {
private final Iterator<Integer> sizeIterator;
public WALByteBufReader(File logFile) throws IOException {
+ this(logFile, FileChannel.open(logFile.toPath(), StandardOpenOption.READ));
+ }
+
+ public WALByteBufReader(File logFile, FileChannel channel) throws IOException {
this.logFile = logFile;
- this.channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ);
+ this.channel = channel;
if (channel.size() < MAGIC_STRING_BYTES || !readTailMagic().equals(MAGIC_STRING)) {
throw new IOException(String.format("Broken wal file %s", logFile));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 07fd08bfafd..e07078988d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -28,7 +28,8 @@ import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
/** This class provides fake wal node when wal is disabled or exception happens. */
public class WALFakeNode implements IWALNode {
private final WALFlushListener.Status status;
- private final Exception cause;
+ private final WALFlushListener successListener;
+ private final WALFlushListener failListener;
private WALFakeNode(WALFlushListener.Status status) {
this(status, null);
@@ -36,7 +37,10 @@ public class WALFakeNode implements IWALNode {
public WALFakeNode(WALFlushListener.Status status, Exception cause) {
this.status = status;
- this.cause = cause;
+ this.successListener = new WALFlushListener(false, null);
+ this.successListener.succeed();
+ this.failListener = new WALFlushListener(false, null);
+ this.failListener.fail(cause);
}
@Override
@@ -56,18 +60,13 @@ public class WALFakeNode implements IWALNode {
}
private WALFlushListener getResult() {
- WALFlushListener walFlushListener = new WALFlushListener(false);
switch (status) {
case SUCCESS:
- walFlushListener.succeed();
- break;
+ return successListener;
case FAILURE:
- walFlushListener.fail(cause);
- break;
default:
- break;
+ return failListener;
}
- return walFlushListener;
}
@Override
@@ -102,7 +101,12 @@ public class WALFakeNode implements IWALNode {
@Override
public long getCurrentSearchIndex() {
- throw new UnsupportedOperationException();
+ return 0;
+ }
+
+ @Override
+ public long getCurrentWALFileVersion() {
+ return 0;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index fe4ae858438..6a6ede3a3cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.buffer.WALSignalEntry;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
import org.apache.iotdb.db.wal.io.WALByteBufReader;
import org.apache.iotdb.db.wal.utils.WALFileStatus;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
@@ -148,6 +149,9 @@ public class WALNode implements IWALNode {
private WALFlushListener log(WALEntry walEntry) {
buffer.write(walEntry);
+ // set handler for pipe
+ walEntry.getWalFlushListener().getWalPipeHandler().setMemTableId(walEntry.getMemTableId());
+ walEntry.getWalFlushListener().getWalPipeHandler().setWalNode(this);
return walEntry.getWalFlushListener();
}
@@ -183,6 +187,18 @@ public class WALNode implements IWALNode {
checkpointManager.makeCreateMemTableCP(memTableInfo);
}
+ // region methods for pipe
+ /** Pin the wal files of the given memory table */
+ public void pinMemTable(long memTableId) throws MemTablePinException {
+ checkpointManager.pinMemTable(memTableId);
+ }
+
+ /** Unpin the wal files of the given memory table */
+ public void unpinMemTable(long memTableId) throws MemTablePinException {
+ checkpointManager.unpinMemTable(memTableId);
+ }
+ // endregion
+
// region Task to delete outdated .wal files
/** Delete outdated .wal files */
public void deleteOutdatedFiles() {
@@ -197,6 +213,8 @@ public class WALNode implements IWALNode {
private static final int MAX_RECURSION_TIME = 5;
/** .wal files whose version ids are less than first valid version id should be deleted */
private long firstValidVersionId;
+ /** the effective information ratio */
+ private double effectiveInfoRatio;
/** recursion time of calling deletion */
private int recursionTime = 0;
@@ -232,7 +250,7 @@ public class WALNode implements IWALNode {
if (totalCost == 0) {
return;
}
- double effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
+ effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
WRITING_METRICS.recordWALNodeEffectiveInfoRatio(identifier, effectiveInfoRatio);
logger.debug(
"Effective information ratio is {}, active memTables cost is {}, flushed memTables cost is {}",
@@ -243,7 +261,7 @@ public class WALNode implements IWALNode {
// update first valid version id by snapshotting or flushing memTable,
// then delete old .wal files again
if (effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()) {
- logger.info(
+ logger.debug(
"Effective information ratio {} (active memTables cost is {}, flushed memTables cost is {}) of wal node-{} is below wal min effective info ratio {}, some memTables will be snapshot or flushed.",
effectiveInfoRatio,
costOfActiveMemTables,
@@ -328,7 +346,7 @@ public class WALNode implements IWALNode {
/**
* Snapshot or flush one memTable,
*
- * @return true if snapshot or flushed
+ * @return true if snapshot or flush is executed successfully
*/
private boolean snapshotOrFlushMemTable() {
// find oldest memTable
@@ -336,6 +354,15 @@ public class WALNode implements IWALNode {
if (oldestMemTableInfo == null) {
return false;
}
+ if (oldestMemTableInfo.isPinned()) {
+ logger.info(
+ "MemTable-{} is pinned and effective information ratio {} of wal node-{} is below wal min effective info ratio {}.",
+ oldestMemTableInfo.getMemTableId(),
+ effectiveInfoRatio,
+ identifier,
+ config.getWalMinEffectiveInfoRatio());
+ return false;
+ }
IMemTable oldestMemTable = oldestMemTableInfo.getMemTable();
// get memTable's virtual database processor
@@ -376,10 +403,12 @@ public class WALNode implements IWALNode {
dataRegion.submitAFlushTask(
TsFileUtils.getTimePartition(tsFile), TsFileUtils.isSequence(tsFile), memTable);
logger.info(
- "WAL node-{} flushes memTable-{} to TsFile {}, memTable size is {}.",
+ "WAL node-{} flushes memTable-{} to TsFile {} because Effective information ratio {} is below wal min effective info ratio {}, memTable size is {}.",
identifier,
memTable.getMemTableId(),
tsFile,
+ effectiveInfoRatio,
+ config.getWalMinEffectiveInfoRatio(),
memTable.getTVListsRamCost());
}
@@ -444,9 +473,11 @@ public class WALNode implements IWALNode {
logger.error("Fail to snapshot memTable of {}", tsFile, flushListener.getCause());
}
logger.info(
- "WAL node-{} snapshots memTable-{} to wal files, memTable size is {}.",
+ "WAL node-{} snapshots memTable-{} to wal files because Effective information ratio {} is below wal min effective info ratio {}, memTable size is {}.",
identifier,
memTable.getMemTableId(),
+ effectiveInfoRatio,
+ config.getWalMinEffectiveInfoRatio(),
memTable.getTVListsRamCost());
}
} finally {
@@ -742,11 +773,15 @@ public class WALNode implements IWALNode {
return buffer.getCurrentSearchIndex();
}
+ @Override
+ public long getCurrentWALFileVersion() {
+ return buffer.getCurrentWALFileVersion();
+ }
+
@Override
public long getTotalSize() {
return WALManager.getInstance().getTotalDiskUsage();
}
-
// endregion
@Override
@@ -759,6 +794,11 @@ public class WALNode implements IWALNode {
return logDirectory;
}
+ /** Get the .wal file starts with the specified version id */
+ public File getWALFile(long versionId) {
+ return WALFileUtils.getWALFile(logDirectory, versionId);
+ }
+
/** Return true when all wal entries all consumed and flushed */
public boolean isAllWALEntriesConsumed() {
return buffer.isAllWALEntriesConsumed();
@@ -788,4 +828,9 @@ public class WALNode implements IWALNode {
long getCurrentLogVersion() {
return buffer.getCurrentWALFileVersion();
}
+
+ @TestOnly
+ CheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
new file mode 100644
index 00000000000..adab38dc5b3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALEntryPosition.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+
+/**
+ * This class uses the tuple(file, position, size) to denote the position of the wal entry, and give
+ * some methods to read the content from the disk.
+ */
+public class WALEntryPosition {
+ private static final WALInsertNodeCache CACHE = WALInsertNodeCache.getInstance();
+ private volatile long walFileVersionId = -1;
+ private volatile long position;
+ private volatile int size;
+ /** wal node, null when wal is disabled */
+ private WALNode walNode = null;
+ /** wal file is not null when openReadFileChannel method has been called */
+ private File walFile = null;
+
+ public WALEntryPosition() {}
+
+ public WALEntryPosition(long walFileVersionId, long position, int size) {
+ this.walFileVersionId = walFileVersionId;
+ this.position = position;
+ this.size = size;
+ }
+
+ /** Read the wal entry and parse it to the InsertNode. Use LRU cache to accelerate read. */
+ public InsertNode readInsertNodeViaCache() throws IOException {
+ if (!canRead()) {
+ throw new IOException("This entry isn't ready for read.");
+ }
+ return CACHE.get(this);
+ }
+
+ /** Read the byte buffer directly. */
+ ByteBuffer read() throws IOException {
+ if (!canRead()) {
+ throw new IOException("Target file hasn't been specified.");
+ }
+ try (FileChannel channel = openReadFileChannel()) {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ channel.position(position);
+ channel.read(buffer);
+ buffer.clear();
+ return buffer;
+ }
+ }
+
+ /**
+ * open the read file channel for this wal entry, this method will retry automatically when the
+ * file is sealed when opening the file channel
+ */
+ public FileChannel openReadFileChannel() throws IOException {
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+ } else {
+ try {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+ } catch (IOException e) {
+ // unsealed file may be renamed after sealed, so we should try again
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return FileChannel.open(walFile.toPath(), StandardOpenOption.READ);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ /** Return true only when the tuple(file, position, size) is ready. */
+ public boolean canRead() {
+ return walFileVersionId >= 0;
+ }
+
+ /** Return true only when this wal file is sealed. */
+ public boolean isInSealedFile() {
+ if (walNode == null || !canRead()) {
+ throw new RuntimeException("This entry isn't ready for read.");
+ }
+ return walFileVersionId < walNode.getCurrentWALFileVersion();
+ }
+
+ public void setWalNode(WALNode walNode) {
+ this.walNode = walNode;
+ }
+
+ public void setEntryPosition(long walFileVersionId, long position) {
+ this.position = position;
+ this.walFileVersionId = walFileVersionId;
+ }
+
+ public long getWalFileVersionId() {
+ return walFileVersionId;
+ }
+
+ public File getWalFile() {
+ return walFile;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(walFileVersionId, position);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WALEntryPosition that = (WALEntryPosition) o;
+ return walFileVersionId == that.walFileVersionId && position == that.position;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
index 8caade5b2ed..16efb6bc8ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -71,6 +71,17 @@ public class WALFileUtils {
return dir.listFiles(WALFileUtils::walFilenameFilter);
}
+ /** Get the .wal file starts with the specified version id in the directory */
+ public static File getWALFile(File dir, long versionId) {
+ String filePrefix = WAL_FILE_PREFIX + versionId + FILE_NAME_SEPARATOR;
+ File[] files =
+ dir.listFiles((d, name) -> walFilenameFilter(d, name) && name.startsWith(filePrefix));
+ if (files == null || files.length != 1) {
+ return null;
+ }
+ return files[0];
+ }
+
/** Parse version id from filename */
public static long parseVersionId(String filename) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
new file mode 100644
index 00000000000..736009f0212
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCache.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.wal.io.WALByteBufReader;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** This cache is used by {@link WALEntryPosition} */
+public class WALInsertNodeCache {
+ private static final Logger logger = LoggerFactory.getLogger(WALInsertNodeCache.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ /** LRU cache, find InsertNode by WALEntryPosition */
+ private final LoadingCache<WALEntryPosition, InsertNode> lruCache;
+
+ /** ids of all pinned memTables */
+ private final Set<Long> memTablesNeedSearch = ConcurrentHashMap.newKeySet();;
+
+ private WALInsertNodeCache() {
+ lruCache =
+ Caffeine.newBuilder()
+ // TODO: pipe module should determine how to configure this param
+ .maximumWeight(config.getAllocateMemoryForWALPipeCache())
+ .weigher(
+ (Weigher<WALEntryPosition, InsertNode>) (position, buffer) -> position.getSize())
+ .build(new WALInsertNodeCacheLoader());
+ }
+
+ public InsertNode get(WALEntryPosition position) {
+ InsertNode res = lruCache.getIfPresent(position);
+ // batch load from the wal file
+ if (res == null) {
+ res = lruCache.getAll(Collections.singleton(position)).get(position);
+ }
+ return res;
+ }
+
+ boolean contains(WALEntryPosition position) {
+ return lruCache.getIfPresent(position) != null;
+ }
+
+ public void addMemTable(long memTableId) {
+ memTablesNeedSearch.add(memTableId);
+ }
+
+ public void removeMemTable(long memTableId) {
+ memTablesNeedSearch.remove(memTableId);
+ }
+
+ public void clear() {
+ lruCache.invalidateAll();
+ memTablesNeedSearch.clear();
+ }
+
+ class WALInsertNodeCacheLoader implements CacheLoader<WALEntryPosition, InsertNode> {
+ private InsertNode parse(ByteBuffer buffer) {
+ PlanNode node = WALEntry.deserializeForConsensus(buffer);
+ if (node instanceof InsertNode) {
+ return (InsertNode) node;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public @Nullable InsertNode load(@NonNull WALEntryPosition key) throws Exception {
+ return parse(key.read());
+ }
+
+ /** Batch load all wal entries in the file when any one key is absent. */
+ @Override
+ public @NonNull Map<@NonNull WALEntryPosition, @NonNull InsertNode> loadAll(
+ @NonNull Iterable<? extends @NonNull WALEntryPosition> keys) {
+ Map<WALEntryPosition, InsertNode> res = new HashMap<>();
+ for (WALEntryPosition pos : keys) {
+ if (res.containsKey(pos) || !pos.canRead()) {
+ continue;
+ }
+ long walFileVersionId = pos.getWalFileVersionId();
+ // load one when wal file is not sealed
+ if (!pos.isInSealedFile()) {
+ try {
+ res.put(pos, load(pos));
+ } catch (Exception e) {
+ logger.info(
+ "Fail to cache wal entries from the wal file with version id {}",
+ walFileVersionId,
+ e);
+ }
+ continue;
+ }
+ // batch load when wal file is sealed
+ long position = 0;
+ try (FileChannel channel = pos.openReadFileChannel();
+ WALByteBufReader walByteBufReader = new WALByteBufReader(pos.getWalFile(), channel)) {
+ while (walByteBufReader.hasNext()) {
+ // see WALInfoEntry#serialize, entry type + memtable id + plan node type
+ ByteBuffer buffer = walByteBufReader.next();
+ int size = buffer.capacity();
+ WALEntryType type = WALEntryType.valueOf(buffer.get());
+ long memTableId = buffer.getLong();
+ if ((memTablesNeedSearch.contains(memTableId) || pos.getPosition() == position)
+ && type.needSearch()) {
+ buffer.clear();
+ InsertNode node = parse(buffer);
+ if (node != null) {
+ res.put(new WALEntryPosition(walFileVersionId, position, size), node);
+ }
+ }
+ position += size;
+ }
+ } catch (IOException e) {
+ logger.info(
+ "Fail to cache wal entries from the wal file with version id {}",
+ walFileVersionId,
+ e);
+ }
+ }
+ return res;
+ }
+ }
+
+ public static WALInsertNodeCache getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final WALInsertNodeCache INSTANCE = new WALInsertNodeCache();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
new file mode 100644
index 00000000000..abdb4771a93
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.exception.WALPipeException;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This handler is used by the Pipe to find the corresponding insert node. Besides, it can try to
+ * pin/unpin the wal entries by the memTable id.
+ */
+public class WALPipeHandler {
+ private static final Logger logger = LoggerFactory.getLogger(WALPipeHandler.class);
+
+ private long memTableId = -1;
+ /** cached value, null after this value is flushed to wal successfully */
+ private volatile WALEntryValue value;
+ /** wal entry's position in the wal, valid after the value is flushed to wal successfully */
+ private final WALEntryPosition walEntryPosition = new WALEntryPosition();
+ /** wal node, null when wal is disabled */
+ private WALNode walNode = null;
+
+ public WALPipeHandler(WALEntryValue value) {
+ this.value = value;
+ }
+
+ /**
+ * Pin the wal files of the given memory table. Notice: cannot pin one memTable too long,
+ * otherwise the wal disk usage may too large.
+ *
+ * @throws MemTablePinException If the memTable has been flushed
+ */
+ public void pinMemTable() throws MemTablePinException {
+ if (walNode == null || memTableId < 0) {
+ throw new MemTablePinException("Fail to pin memTable because of internal error.");
+ }
+ walNode.pinMemTable(memTableId);
+ }
+
+ /**
+ * Unpin the wal files of the given memory table.
+ *
+ * @throws MemTablePinException If there aren't corresponding pin operations
+ */
+ public void unpinMemTable() throws MemTablePinException {
+ if (walNode == null || memTableId < 0) {
+ throw new MemTablePinException("Fail to pin memTable because of internal error.");
+ }
+ walNode.unpinMemTable(memTableId);
+ }
+
+ /** Get this handler's value */
+ public InsertNode getValue() throws WALPipeException {
+ // return local cache
+ WALEntryValue res = value;
+ if (res != null) {
+ if (res instanceof InsertNode) {
+ return (InsertNode) value;
+ } else {
+ throw new WALPipeException("Fail to get value because the entry type isn't InsertNode.");
+ }
+ }
+ // wait until the position is ready
+ while (!walEntryPosition.canRead()) {
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted when waiting for result.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ // read from the wal file
+ try {
+ return walEntryPosition.readInsertNodeViaCache();
+ } catch (Exception e) {
+ throw new WALPipeException("Fail to get value because the file content isn't correct.", e);
+ }
+ }
+
+ public void setMemTableId(long memTableId) {
+ this.memTableId = memTableId;
+ }
+
+ public void setWalNode(WALNode walNode) {
+ this.walNode = walNode;
+ this.walEntryPosition.setWalNode(walNode);
+ }
+
+ public WALEntryPosition getWalEntryPosition() {
+ return walEntryPosition;
+ }
+
+ public void setEntryPosition(long walFileVersionId, long position) {
+ this.walEntryPosition.setEntryPosition(walFileVersionId, position);
+ this.value = null;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ public int getSize() {
+ return walEntryPosition.getSize();
+ }
+
+ public void setSize(int size) {
+ this.walEntryPosition.setSize(size);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
index 846e47961f9..1b468635899 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALFlushListener.java
@@ -18,9 +18,20 @@
*/
package org.apache.iotdb.db.wal.utils.listener;
+import org.apache.iotdb.db.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+
/** This class helps judge whether wal is flushed to the storage device. */
public class WALFlushListener extends AbstractResultListener {
- public WALFlushListener(boolean wait) {
+ // handler for pipeline, only exists then value is InsertNode
+ private final WALPipeHandler walPipeHandler;
+
+ public WALFlushListener(boolean wait, WALEntryValue value) {
super(wait);
+ walPipeHandler = new WALPipeHandler(value);
+ }
+
+ public WALPipeHandler getWalPipeHandler() {
+ return walPipeHandler;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 8542c817ef7..770a3c7fb26 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -70,15 +70,14 @@ public class WALNodeTest {
private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
private static final String devicePath = "root.test_sg.test_d";
private WALMode prevMode;
+ private boolean prevIsClusterMode;
private WALNode walNode;
- private boolean isClusterMode;
-
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory);
prevMode = config.getWalMode();
- isClusterMode = config.isClusterMode();
+ prevIsClusterMode = config.isClusterMode();
config.setWalMode(WALMode.SYNC);
config.setClusterMode(true);
walNode = new WALNode(identifier, logDirectory);
@@ -88,7 +87,7 @@ public class WALNodeTest {
public void tearDown() throws Exception {
walNode.close();
config.setWalMode(prevMode);
- config.setClusterMode(isClusterMode);
+ config.setClusterMode(prevIsClusterMode);
EnvironmentUtils.cleanDir(logDirectory);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
new file mode 100644
index 00000000000..77c28c6d333
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALPipeHandlerTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class WALPipeHandlerTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+ private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+ private static final String devicePath = "root.test_sg.test_d";
+ private WALMode prevMode;
+ private boolean prevIsClusterMode;
+ private WALNode walNode;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanDir(logDirectory);
+ prevMode = config.getWalMode();
+ prevIsClusterMode = config.isClusterMode();
+ config.setWalMode(WALMode.SYNC);
+ config.setClusterMode(true);
+ walNode = new WALNode(identifier, logDirectory);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ walNode.close();
+ config.setWalMode(prevMode);
+ config.setClusterMode(prevIsClusterMode);
+ EnvironmentUtils.cleanDir(logDirectory);
+ }
+
+ @Test(expected = MemTablePinException.class)
+ public void pinDeletedMemTable() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ WALFlushListener flushListener =
+ walNode.log(
+ memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+ walNode.onMemTableFlushed(memTable);
+ // pin flushed memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.pinMemTable();
+ }
+
+ @Test
+ public void pinMemTable() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+ // pin memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.pinMemTable();
+ walNode.onMemTableFlushed(memTable);
+ // roll wal file
+ walNode.rollWALFile();
+ walNode.rollWALFile();
+ // find node1
+ ConsensusReqReader.ReqIterator itr = walNode.getReqIterator(1);
+ assertTrue(itr.hasNext());
+ assertEquals(
+ node1,
+ WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+ // try to delete flushed but pinned memTable
+ walNode.deleteOutdatedFiles();
+ // try to find node1
+ itr = walNode.getReqIterator(1);
+ assertTrue(itr.hasNext());
+ assertEquals(
+ node1,
+ WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+ }
+
+ @Test(expected = MemTablePinException.class)
+ public void unpinDeletedMemTable() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ WALFlushListener flushListener =
+ walNode.log(
+ memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+ walNode.onMemTableFlushed(memTable);
+ // pin flushed memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.unpinMemTable();
+ }
+
+ @Test
+ public void unpinFlushedMemTable() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ WALFlushListener flushListener =
+ walNode.log(
+ memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ // pin twice
+ handler.pinMemTable();
+ handler.pinMemTable();
+ walNode.onMemTableFlushed(memTable);
+ // unpin 1
+ CheckpointManager checkpointManager = walNode.getCheckpointManager();
+ handler.unpinMemTable();
+ MemTableInfo oldestMemTableInfo = checkpointManager.getOldestMemTableInfo();
+ assertEquals(memTable.getMemTableId(), oldestMemTableInfo.getMemTableId());
+ assertNull(oldestMemTableInfo.getMemTable());
+ assertTrue(oldestMemTableInfo.isPinned());
+ // unpin 2
+ handler.unpinMemTable();
+ assertNull(checkpointManager.getOldestMemTableInfo());
+ }
+
+ @Test
+ public void unpinMemTable() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+ // pin memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.pinMemTable();
+ walNode.onMemTableFlushed(memTable);
+ // roll wal file
+ walNode.rollWALFile();
+ walNode.rollWALFile();
+ // find node1
+ ConsensusReqReader.ReqIterator itr = walNode.getReqIterator(1);
+ assertTrue(itr.hasNext());
+ assertEquals(
+ node1,
+ WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
+ // unpin flushed memTable
+ handler.unpinMemTable();
+ // try to delete flushed but pinned memTable
+ walNode.deleteOutdatedFiles();
+ // try to find node1
+ itr = walNode.getReqIterator(1);
+ assertFalse(itr.hasNext());
+ }
+
+ @Test
+ public void getUnFlushedValue() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+ // pin memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.pinMemTable();
+ walNode.onMemTableFlushed(memTable);
+ assertEquals(node1, handler.getValue());
+ }
+
+ @Test
+ public void getFlushedValue() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+ // pin memTable
+ WALPipeHandler handler = flushListener.getWalPipeHandler();
+ handler.pinMemTable();
+ walNode.onMemTableFlushed(memTable);
+ // wait until wal flushed
+ while (!walNode.isAllWALEntriesConsumed()) {
+ Thread.sleep(50);
+ }
+ assertEquals(node1, handler.getValue());
+ }
+
+ private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException {
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0d;
+ columns[1] = 2f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0);
+
+ InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+ MeasurementSchema[] schemas = new MeasurementSchema[6];
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+ }
+ node.setMeasurementSchemas(schemas);
+ return node;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
new file mode 100644
index 00000000000..98459619a70
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALInsertNodeCacheTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WALInsertNodeCacheTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+ private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+ private static final String devicePath = "root.test_sg.test_d";
+ private static final WALInsertNodeCache cache = WALInsertNodeCache.getInstance();
+ private WALMode prevMode;
+ private boolean prevIsClusterMode;
+ private WALNode walNode;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanDir(logDirectory);
+ cache.clear();
+ prevMode = config.getWalMode();
+ prevIsClusterMode = config.isClusterMode();
+ config.setWalMode(WALMode.SYNC);
+ config.setClusterMode(true);
+ walNode = new WALNode(identifier, logDirectory);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ walNode.close();
+ cache.clear();
+ config.setWalMode(prevMode);
+ config.setClusterMode(prevIsClusterMode);
+ EnvironmentUtils.cleanDir(logDirectory);
+ }
+
+ @Test
+ public void testLoadUnsealedWALFile() throws Exception {
+ IMemTable memTable = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener = walNode.log(memTable.getMemTableId(), node1);
+ WALEntryPosition position = flushListener.getWalPipeHandler().getWalEntryPosition();
+ // wait until wal flushed
+ while (!walNode.isAllWALEntriesConsumed() || !position.canRead()) {
+ Thread.sleep(50);
+ }
+ // load by cache
+ assertEquals(node1, cache.get(position));
+ }
+
+ @Test
+ public void testBatchLoad() throws Exception {
+ // write memTable1
+ IMemTable memTable1 = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable1, logDirectory + "/" + "fake1.tsfile");
+ InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(1);
+ WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), node1);
+ WALEntryPosition position1 = flushListener1.getWalPipeHandler().getWalEntryPosition();
+ InsertRowNode node2 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(2);
+ WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), node2);
+ WALEntryPosition position2 = flushListener2.getWalPipeHandler().getWalEntryPosition();
+ // write memTable2
+ IMemTable memTable2 = new PrimitiveMemTable();
+ walNode.onMemTableCreated(memTable2, logDirectory + "/" + "fake2.tsfile");
+ InsertRowNode node3 = getInsertRowNode(devicePath, System.currentTimeMillis());
+ node1.setSearchIndex(3);
+ WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), node3);
+ WALEntryPosition position3 = flushListener3.getWalPipeHandler().getWalEntryPosition();
+ // wait until wal flushed
+ walNode.rollWALFile();
+ while (!walNode.isAllWALEntriesConsumed() || !position3.canRead()) {
+ Thread.sleep(50);
+ }
+ // check batch load memTable1
+ cache.addMemTable(memTable1.getMemTableId());
+ assertEquals(node1, cache.get(position1));
+ assertTrue(cache.contains(position1));
+ assertTrue(cache.contains(position2));
+ assertFalse(cache.contains(position3));
+ // check batch load none
+ cache.removeMemTable(memTable1.getMemTableId());
+ cache.clear();
+ assertEquals(node1, cache.get(position1));
+ assertTrue(cache.contains(position1));
+ assertFalse(cache.contains(position2));
+ assertFalse(cache.contains(position3));
+ }
+
+ private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException {
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0d;
+ columns[1] = 2f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0);
+
+ InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+ MeasurementSchema[] schemas = new MeasurementSchema[6];
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+ }
+ node.setMeasurementSchemas(schemas);
+ return node;
+ }
+}