You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2024/03/29 15:03:57 UTC

(iotdb) branch wal-compress updated (531c99bb7ce -> 4d4cbae9f07)

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a change to branch wal-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 531c99bb7ce enable wal compression
     new 4d4cbae9f07 enable wal compression

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (531c99bb7ce)
            \
             N -- N -- N   refs/heads/wal-compress (4d4cbae9f07)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  3 +--
 .../planner/plan/node/write/InsertRowNode.java     |  2 ++
 .../planner/plan/node/write/InsertRowsNode.java    |  5 ++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 19 +++++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 31 +++++++++++++++++++---
 .../dataregion/flush/MemTableFlushTask.java        |  3 ---
 .../dataregion/memtable/AbstractMemTable.java      | 13 +--------
 .../storageengine/dataregion/wal/io/LogWriter.java |  2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java | 16 ++++++++---
 9 files changed, 68 insertions(+), 26 deletions(-)


(iotdb) 01/01: enable wal compression

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch wal-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4d4cbae9f07b0cc6131b2f468f189410e745c5dd
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Thu Mar 28 22:46:06 2024 +0800

    enable wal compression
    
    remove metrics in mem table flush task, cache hash code in partial path, use gzip to compress wal
    
    batch update metrics
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   9 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../planner/plan/node/write/InsertRowNode.java     |   2 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  19 ++++
 .../db/storageengine/dataregion/DataRegion.java    |  31 ++++++-
 .../dataregion/flush/MemTableFlushTask.java        |   3 -
 .../dataregion/memtable/AbstractMemTable.java      |  13 +--
 .../dataregion/wal/io/CheckpointReader.java        |   3 +-
 .../storageengine/dataregion/wal/io/LogWriter.java |  36 +++++++
 .../dataregion/wal/io/WALByteBufReader.java        |   7 +-
 .../dataregion/wal/io/WALInputStream.java          | 103 +++++++++++++++++++++
 .../storageengine/dataregion/wal/io/WALReader.java |   6 +-
 .../storageengine/dataregion/wal/io/WALWriter.java |   2 +
 .../dataregion/wal/utils/WALWriteUtils.java        |  10 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  16 +++-
 16 files changed, 234 insertions(+), 36 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a167505..ad180cc7668 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -94,6 +94,7 @@ public class IoTDBConfig {
       "([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" + PARTIAL_NODE_MATCHER + ")*";
 
   public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
+  boolean enableWALCompression = true;
 
   /** Whether to enable the mqtt service. */
   private boolean enableMQTTService = false;
@@ -3829,4 +3830,12 @@ public class IoTDBConfig {
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableWALCompression() {
+    return enableWALCompression;
+  }
+
+  public void setEnableWALCompression(boolean enableWALCompression) {
+    this.enableWALCompression = enableWALCompression;
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 139a1374b44..3b71dc7d27c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,6 +412,11 @@ public class IoTDBDescriptor {
                 "io_task_queue_size_for_flushing",
                 Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
 
+    conf.setEnableWALCompression(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression()))));
+
     conf.setCompactionScheduleIntervalInMs(
         Long.parseLong(
             properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index b0d8aad94c3..2bc63523010 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class InsertRowNode extends InsertNode implements WALEntryValue {
 
@@ -67,6 +68,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   private Object[] values;
 
   private boolean isNeedInferType = false;
+  public AtomicInteger insertCount;
 
   public InsertRowNode(PlanNodeId id) {
     super(id);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 12e229470be..a2950e55894 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -42,6 +42,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class InsertRowsNode extends InsertNode {
 
@@ -60,6 +62,9 @@ public class InsertRowsNode extends InsertNode {
   /** the InsertRowsNode list */
   private List<InsertRowNode> insertRowNodeList;
 
+  public AtomicInteger insertCount = new AtomicInteger(0);
+  public AtomicLong[] metrics;
+
   public InsertRowsNode(PlanNodeId id) {
     super(id);
     insertRowNodeList = new ArrayList<>();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index f671a87a1f9..2d573c00ee0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
@@ -60,6 +61,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -419,6 +421,13 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         break;
       case WRITE:
         PlanNode planNode = instance.getFragment().getPlanNodeTree();
+        if (planNode instanceof InsertRowsNode) {
+          InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+          insertRowsNode.metrics = new AtomicLong[4];
+          for (int i = 0; i < 4; i++) {
+            insertRowsNode.metrics[i] = new AtomicLong(0);
+          }
+        }
         RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
         RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode);
         if (!writeResult.isAccepted()) {
@@ -438,6 +447,16 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         } else {
           // some expected and accepted status except SUCCESS_STATUS need to be returned
           TSStatus status = writeResult.getStatus();
+          if (planNode instanceof InsertRowsNode) {
+            InsertRowsNode insertRowsNode = (InsertRowsNode) planNode;
+            PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(
+                insertRowsNode.metrics[0].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(
+                insertRowsNode.metrics[1].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(insertRowsNode.metrics[2].get());
+            PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(
+                insertRowsNode.metrics[3].get());
+          }
           if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new FragmentInstanceDispatchException(status);
           }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 469eeae7ad6..81a35ce869f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -104,6 +106,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
 import org.apache.iotdb.db.utils.DateTimeUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -1176,6 +1179,7 @@ public class DataRegion implements IDataRegionForQuery {
       if (insertRowNode.allMeasurementFailed()) {
         continue;
       }
+      insertRowNode.insertCount = insertRowsNode.insertCount;
       TsFileProcessor tsFileProcessor =
           getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
       if (tsFileProcessor == null) {
@@ -1197,10 +1201,29 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
-    PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    MetricService.getInstance()
+        .count(
+            insertRowsNode.insertCount.get(),
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            Metric.POINTS_IN.toString(),
+            Tag.DATABASE.toString(),
+            databaseName,
+            Tag.REGION.toString(),
+            dataRegionId);
+
+    if (insertRowsNode.metrics != null) {
+      insertRowsNode.metrics[0].addAndGet(costsForMetrics[0]);
+      insertRowsNode.metrics[1].addAndGet(costsForMetrics[1]);
+      insertRowsNode.metrics[2].addAndGet(costsForMetrics[2]);
+      insertRowsNode.metrics[3].addAndGet(costsForMetrics[3]);
+    } else {
+      PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
+    }
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index fe4fe1eec04..ce8e2929f38 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -156,7 +156,6 @@ public class MemTableFlushTask {
         series.sortTvListForFlush();
         long subTaskTime = System.currentTimeMillis() - startTime;
         sortTime += subTaskTime;
-        WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.SORT_TASK, subTaskTime);
         encodingTaskQueue.put(series);
       }
 
@@ -258,7 +257,6 @@ public class MemTableFlushTask {
                 Thread.currentThread().interrupt();
               }
               long subTaskTime = System.currentTimeMillis() - starTime;
-              WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
               memSerializeTime += subTaskTime;
             }
           }
@@ -344,7 +342,6 @@ public class MemTableFlushTask {
           }
           long subTaskTime = System.currentTimeMillis() - starTime;
           ioTime += subTaskTime;
-          WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.IO_TASK, subTaskTime);
         }
         LOGGER.debug(
             "flushing a memtable to file {} in database {}, io cost {}ms",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index f8d7f6a53d7..58004349ce6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -204,18 +204,7 @@ public abstract class AbstractMemTable implements IMemTable {
             - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId);
+    insertRowNode.insertCount.addAndGet(pointsInserted);
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..081b3ed4a4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,7 +47,7 @@ public class CheckpointReader {
   private void init() {
     checkpoints = new ArrayList<>();
     try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) {
+        new DataInputStream(new BufferedInputStream(new WALInputStream(logFile)))) {
       maxMemTableId = logStream.readLong();
       while (logStream.available() > 0) {
         Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..c3fe218fb40 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,18 +47,51 @@ public abstract class LogWriter implements ILogWriter {
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
   protected long size;
+  protected boolean isEndFile = false;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1);
+  private final ICompressor compressor = ICompressor.getCompressor(CompressionType.GZIP);
+  private final ByteBuffer compressedByteBuffer;
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
     this.logStream = new FileOutputStream(logFile, true);
     this.logChannel = this.logStream.getChannel();
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) {
+      compressedByteBuffer =
+          ByteBuffer.allocate(
+              compressor.getMaxBytesForCompression(
+                  IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+    } else {
+      compressedByteBuffer = null;
+    }
   }
 
   @Override
   public void write(ByteBuffer buffer) throws IOException {
+    int bufferSize = buffer.position();
     size += buffer.position();
     buffer.flip();
+    boolean compressed = false;
+    int uncompressedSize = bufferSize;
+    if (!isEndFile && IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
+    /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 512KB */ ) {
+      compressedByteBuffer.clear();
+      compressor.compress(buffer, compressedByteBuffer);
+      buffer = compressedByteBuffer;
+      bufferSize = buffer.position();
+      buffer.flip();
+      compressed = true;
+    }
+    size += bufferSize;
+    headerBuffer.clear();
+    headerBuffer.putInt(bufferSize);
+    headerBuffer.put((byte) (compressed ? 1 : 0));
     try {
+      if (compressed) {
+        headerBuffer.putInt(uncompressedSize);
+      }
+      headerBuffer.flip();
+      logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf3647..ad3b7479de9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable {
   private final File logFile;
   private final FileChannel channel;
   private final WALMetaData metaData;
+  private final DataInputStream logStream;
   private final Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable {
   public WALByteBufReader(File logFile, FileChannel channel) throws IOException {
     this.logFile = logFile;
     this.channel = channel;
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
     this.metaData = WALMetaData.readFromWALFile(logFile, channel);
     this.sizeIterator = metaData.getBuffersSize().iterator();
     channel.position(0);
@@ -64,8 +67,8 @@ public class WALByteBufReader implements Closeable {
   public ByteBuffer next() throws IOException {
     int size = sizeIterator.next();
     ByteBuffer buffer = ByteBuffer.allocate(size);
-    channel.read(buffer);
-    buffer.clear();
+    logStream.readFully(buffer.array(), 0, size);
+    buffer.flip();
     return buffer;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..8e742b3cb1b
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class);
+  private final FileChannel channel;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1);
+  private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES);
+  private ByteBuffer dataBuffer =
+      ByteBuffer.allocate(
+          IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // uncompressed data buffer
+
+  public WALInputStream(File logFile) throws IOException {
+    channel = FileChannel.open(logFile.toPath());
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() == dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    return dataBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+    dataBuffer = null;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) (channel.size() - channel.position());
+  }
+
+  private void loadNextSegment() throws IOException {
+    headerBuffer.clear();
+    if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+      throw new IOException("Unexpected end of file");
+    }
+    headerBuffer.flip();
+    int dataSize = headerBuffer.getInt();
+    boolean isCompressed = headerBuffer.get() == 1;
+    if (isCompressed) {
+      compressedHeader.clear();
+      if (channel.read(compressedHeader) != Integer.BYTES) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedHeader.flip();
+      int uncompressedSize = compressedHeader.getInt();
+      if (uncompressedSize > dataBuffer.capacity()) {
+        // enlarge buffer
+        dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      }
+      ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(compressedData) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedData.flip();
+      IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      dataBuffer.clear();
+      unCompressor.uncompress(compressedData, dataBuffer);
+    } else {
+      dataBuffer = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(dataBuffer) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+    }
+    dataBuffer.flip();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..475ea2b0b2d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -57,9 +55,7 @@ public class WALReader implements Closeable {
   public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
     this.logFile = logFile;
     this.fileMayCorrupt = fileMayCorrupt;
-    this.logStream =
-        new DataInputStream(
-            new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE));
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
   }
 
   /** Like {@link Iterator#hasNext()}. */
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc676fad..20ae9975450 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -59,6 +59,7 @@ public class WALWriter extends LogWriter {
   }
 
   private void endFile() throws IOException {
+    this.isEndFile = true;
     WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
     int metaDataSize = metaData.serializedSize();
     ByteBuffer buffer =
@@ -72,6 +73,7 @@ public class WALWriter extends LogWriter {
     // add magic string
     buffer.put(MAGIC_STRING.getBytes());
     write(buffer);
+    this.isEndFile = false;
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
index d5702e7004a..633a8153b66 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
@@ -126,10 +126,12 @@ public class WALWriteUtils {
       return write(NO_BYTE_TO_READ, buffer);
     }
     int len = 0;
-    byte[] bytes = s.getBytes();
-    len += write(bytes.length, buffer);
-    buffer.put(bytes);
-    len += bytes.length;
+    len += write(s.length(), buffer);
+    for (int i = 0; i < s.length(); i++) {
+      char c = s.charAt(i);
+      buffer.put((byte) c); // ascii only
+    }
+    len += s.length();
     return len;
   }
 
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index 217fd3ed1aa..60845c77fb9 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -58,6 +58,8 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
   private static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"});
 
   protected String[] nodes;
+  protected int hashCache;
+  protected boolean cacheHashCache = false;
 
   public PartialPath() {}
 
@@ -713,11 +715,17 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
 
   @Override
   public int hashCode() {
-    int h = 0;
-    for (String node : nodes) {
-      h += 31 * h + node.hashCode();
+    if (cacheHashCache) {
+      return hashCache;
+    } else {
+      int h = 0;
+      for (String node : nodes) {
+        h += 31 * h + node.hashCode();
+      }
+      hashCache = h;
+      cacheHashCache = true;
+      return h;
     }
-    return h;
   }
 
   @Override