You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/22 14:20:00 UTC

[GitHub] [iotdb] HeimingZ opened a new pull request #5320: [IOTDB-1614] New WAL

HeimingZ opened a new pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320


   Details are on [design docs](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726041).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r837046199



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
##########
@@ -280,6 +295,21 @@ public void serializeImpl(ByteBuffer buffer) {
     }
   }
 
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    int type = PhysicalPlanType.MULTI_BATCH_INSERT.ordinal();
+    buffer.put((byte) type);
+    buffer.putInt(insertTabletPlanList.size());
+    for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+      insertTabletPlan.subSerialize(buffer);
+    }
+
+    buffer.putInt(parentInsertTabletPlanIndexList.size());
+    for (Integer index : parentInsertTabletPlanIndexList) {
+      buffer.putInt(index);
+    }
+  }
+

Review comment:
       Is current javadoc in the IWALByteBufferView interface ok?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r841005119



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -522,17 +399,51 @@ private void recover() throws StorageGroupProcessorException {
 
       // split by partition so that we can find the last file of each partition and decide to
       // close it or not
-      RecoveryContext recoveryContext =
-          new RecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
+      VSGRecoveryContext VSGRecoveryContext =
+          new VSGRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
       Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
           splitResourcesByPartition(tmpSeqTsFiles);
       Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
           splitResourcesByPartition(tmpUnseqTsFiles);
+      // recover unsealed TsFiles
+      List<WALRecoverListener> recoverListeners = new ArrayList<>();
       for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
-        recoverTsFiles(value, recoveryContext, true);
+        if (!value.isEmpty()) {
+          TsFileResource unsealedTsFileResource = value.get(value.size() - 1);

Review comment:
       Fixed, all TsFiles without resource file will be treated as unsealed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r840279576



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEditValue.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+/** A class implements this interface can be written into .wal file. */
+public interface WALEditValue extends SerializedSize {

Review comment:
       So ,WALEntryValue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839353102



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();

Review comment:
       It's better for CheckpointManager to manage its' own information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839344496



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEditValue.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+/** A class implements this interface can be written into .wal file. */
+public interface WALEditValue extends SerializedSize {

Review comment:
       WALEditValue is the value part of WALEdit, so it's unnecessary to rename it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r835870497



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0

Review comment:
       change to 100, be consistent with 0.13

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.
+# Datatype: int
+# max_wal_num=0
 
-# When a certain amount of insert ahead log is reached, it will be flushed to disk
-# It is possible to lose at most flush_wal_threshold operations
+# Buffer size of each wal node
+# If it sets a value smaller than 0, use the default value 16777216 bytes (16MB).
 # Datatype: int
-# flush_wal_threshold=10000
+# wal_buffer_size_in_byte=16777216
 
-# The cycle when insert ahead log is periodically forced to be written to disk(in milliseconds)
-# If force_wal_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
-# Set this parameter to 0 may slow down the ingestion on slow disk.
-# Datatype: long
-# force_wal_period_in_ms=100
+# Buffer entry size of each wal buffer
+# If it sets a value smaller than 0, use the default value 16384 bytes (16KB).
+# Datatype: int
+# wal_buffer_entry_size_in_byte=16384
+
+# Max storage space for wal
+# The default value 0 means the storage space will not be controlled.

Review comment:
       ```suggestion
   # The default value 0 means the storage space will only be deleted when xxx
   ```
   
   change xxx in detail.  not be controlled is not tolerable in a database

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -340,6 +338,16 @@ public void insertTablet(
     tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
   }
 
+  private void createNewWorkingMemTable() throws WriteProcessException {
+    if (enableMemControl) {
+      workMemTable = new PrimitiveMemTable(enableMemControl);
+      MemTableManager.getInstance().addMemtableNumber();

Review comment:
       could we put these two into  MemtableManager?

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
##########
@@ -226,15 +214,18 @@ protected void serializeImpl(ByteBuffer buffer) {
   }
 
   /**
-   * Deserialize the plan from the given buffer. This is provided for WAL, and must be used with
-   * serializeToWAL.
+   * Deserialize the plan from the given buffer.
    *
    * @param buffer
    */
   public void deserialize(ByteBuffer buffer) throws IllegalPathException, IOException {
     throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
   }
 
+  protected int getSerializedBytesNum(String value) {
+    return value == null ? Integer.BYTES : Integer.BYTES + value.getBytes().length;

Review comment:
       add some doc, which fields are calculated

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -455,7 +332,7 @@ public void setReady(boolean ready) {
   }
 
   /** this class is used to store recovering context */
-  private class RecoveryContext {
+  private class VSGRecoveryContext {
     /** number of files to be recovered */
     private final long filesToRecoverNum;

Review comment:
       ```suggestion
       private final long numOfFilesToRecover;
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover.file;
+
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.exception.WALRecoverException;
+import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/**
+ * This class is used to help recover all unsealed TsFiles at zero level. There are 3 main
+ * procedures: start recovery, redo logs, and end recovery, you must call them in order. Notice:
+ * This class doesn't guarantee concurrency safety.
+ */
+public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerformer {
+  private static final Logger logger =
+      LoggerFactory.getLogger(UnsealedTsFileRecoverPerformer.class);
+
+  /** sequence file or not */
+  private final boolean sequence;
+  /** add recovered TsFile back to virtual storage group */
+  private final Consumer<UnsealedTsFileRecoverPerformer> callbackAfterUnsealedTsFileRecovered;
+  /** redo wal log to recover TsFile */
+  private final TsFilePlanRedoer walRedoer;
+  /** trace result of this recovery */
+  private final WALRecoverListener recoverListener;
+
+  public UnsealedTsFileRecoverPerformer(
+      TsFileResource tsFileResource,
+      boolean sequence,
+      VirtualStorageGroupProcessor vsgProcessor,
+      Consumer<UnsealedTsFileRecoverPerformer> callbackAfterUnsealedTsFileRecovered) {
+    super(tsFileResource);
+    this.sequence = sequence;
+    this.callbackAfterUnsealedTsFileRecovered = callbackAfterUnsealedTsFileRecovered;
+    this.walRedoer = new TsFilePlanRedoer(tsFileResource, sequence, vsgProcessor);
+    this.recoverListener = new WALRecoverListener(tsFileResource.getTsFilePath());
+  }
+
+  /**
+   * Make preparation for recovery, including load .resource file (reconstruct when necessary) and
+   * truncate the file to remaining corrected data.
+   */
+  public void startRecovery() throws StorageGroupProcessorException, IOException {
+    super.recoverWithWriter();
+
+    if (hasCrashed()) {
+      // tsfile has crashed due to failure,
+      // the last ChunkGroup may contain the same data as the WALs,
+      // so the time map must be updated first to avoid duplicated insertion
+      loadResourceFromWriter();
+    }
+  }
+
+  private void loadResourceFromWriter() {

Review comment:
       ```suggestion
     private void constructResourceFromTsFile() {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        folder = folder + File.separator + identifier;
+        // create new wal node
+        try {
+          selectedNode = new WALNode(identifier, folder);
+        } catch (FileNotFoundException e) {
+          logger.error("Fail to create wal node", e);

Review comment:
       nodeIdCounter--

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/SerializedSize.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+/** Implementations should calculate their accurate serialized size in bytes. */

Review comment:
       Add javadoc, which class should implement this class, and where to use this interface

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
+  private int currentLogVersion = 0;
+  /** current checkpoint file log writer, only updated by fsyncAndDeleteThread */
+  private ILogWriter currentLogWriter;
+  // endregion
+
+  public CheckpointManager(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new CheckpointWriter(
+            SystemFileFactory.INSTANCE.getFile(
+                logDirectory, CheckpointWriter.getLogFileName(currentLogVersion)));
+    makeGlobalInfoCP();
+    fsyncCheckpointFile();
+  }
+
+  /**
+   * make checkpoint for global memTables' info, this checkpoint only exists in the beginning of
+   * each checkpoint file
+   */
+  private void makeGlobalInfoCP() {
+    infoLock.lock();
+    try {
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new ArrayList<>(memTableId2Info.values()));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for create memTable info */
+  public void makeCreateMemTableCP(MemTableInfo memTableInfo) {
+    infoLock.lock();
+    try {
+      memTableId2Info.put(memTableInfo.getMemTableId(), memTableInfo);
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.CREATE_MEMORY_TABLE, Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for flush memTable info */
+  public void makeFlushMemTableCP(int memTableId) {
+    infoLock.lock();
+    try {
+      MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
+      if (memTableInfo == null) {
+        return;
+      }
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.FLUSH_MEMORY_TABLE, Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private void logByCachedByteBuffer(Checkpoint checkpoint) {
+    // make sure cached ByteBuffer has enough capacity
+    int estimateSize = checkpoint.serializedSize();
+    if (cachedByteBuffer == null || estimateSize > cachedByteBuffer.capacity()) {
+      cachedByteBuffer = ByteBuffer.allocate(estimateSize);
+    }
+    checkpoint.serialize(cachedByteBuffer);
+    try {
+      currentLogWriter.write(cachedByteBuffer);
+    } catch (IOException e) {
+      logger.error("Fail to make checkpoint: {}", checkpoint, e);
+    } finally {
+      cachedByteBuffer.clear();
+    }
+  }
+
+  // region Task to fsync checkpoint file
+  /** Fsync checkpoints to the disk */
+  public void fsyncCheckpointFile() {
+    infoLock.lock();
+    try {
+      try {
+        currentLogWriter.force();
+      } catch (IOException e) {
+        logger.error(
+            "Fail to fsync wal node-{}'s checkpoint writer, change system mode to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+
+      try {
+        if (tryRollingLogWriter()) {
+          // first log global memTables' info, then delete old checkpoint file
+          makeGlobalInfoCP();
+          currentLogWriter.force();
+          File oldFile =
+              SystemFileFactory.INSTANCE.getFile(
+                  logDirectory, CheckpointWriter.getLogFileName(currentLogVersion - 1));
+          oldFile.delete();
+        }
+      } catch (IOException e) {
+        logger.error(
+            "Fail to roll wal node-{}'s checkpoint writer, change system mode to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private boolean tryRollingLogWriter() throws IOException {
+    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {
+      return false;
+    }
+    currentLogWriter.close();
+    currentLogVersion++;
+    File nextLogFile =
+        SystemFileFactory.INSTANCE.getFile(
+            logDirectory, CheckpointWriter.getLogFileName(currentLogVersion));
+    currentLogWriter = new CheckpointWriter(nextLogFile);
+    return true;
+  }
+  // endregion
+
+  public MemTableInfo getOldestMemTableInfo() {
+    // find oldest memTable
+    List<MemTableInfo> memTableInfos;
+    infoLock.lock();
+    try {
+      memTableInfos = new ArrayList<>(memTableId2Info.values());
+    } finally {
+      infoLock.unlock();
+    }
+    if (memTableInfos.isEmpty()) {
+      return null;
+    }
+    MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
+    for (MemTableInfo memTableInfo : memTableInfos) {
+      if (oldestMemTableInfo.getFirstFileVersionId() > memTableInfo.getFirstFileVersionId()) {
+        oldestMemTableInfo = memTableInfo;
+      }
+    }
+    return oldestMemTableInfo;
+  }
+
+  /**
+   * Get version id of first valid .wal file
+   *
+   * @return Return {@link Integer#MIN_VALUE} if no file is valid
+   */
+  public int getFirstValidVersionId() {

Review comment:
       ```suggestion
     public int getFirstValidWALVersionId() {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);

Review comment:
       nodeIdCounter--

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/WALSubmitter.java
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+/** Mark which classes will submit wal. */
+public interface WALSubmitter {}

Review comment:
       remove

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -504,4 +514,15 @@ public long getCreatedTime() {
   private IDeviceID getDeviceID(PartialPath deviceId) {
     return DeviceIDFactory.getInstance().getDeviceID(deviceId);
   }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    // TODO

Review comment:
       todo what?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEditValue.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+/** A class implements this interface can be written into .wal file. */
+public interface WALEditValue extends SerializedSize {

Review comment:
       ```suggestion
   public interface WALEntry extends SerializedSize {
   ```

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.

Review comment:
       hard to understand

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/recover/file/SealedTsFileRecoverPerformer.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover.file;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+
+import java.io.IOException;
+
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
+
+/** This class is used to help recover all sealed TsFiles, except unsealed TsFile at zero level. */
+public class SealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerformer {
+  public SealedTsFileRecoverPerformer(TsFileResource tsFileResource) {
+    super(tsFileResource);
+  }
+
+  /**
+   * Recover sealed TsFile, including load .resource file (reconstruct when necessary) and truncate
+   * the file to remaining corrected data
+   */
+  public void recover() throws StorageGroupProcessorException, IOException {
+    super.recoverWithWriter();
+
+    if (hasCrashed()) {

Review comment:
       Could a Sealed TsFile hasCrashed = true?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+/** Currently, there are 2 buffer types */

Review comment:
       explain which 2 types

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
##########
@@ -270,4 +307,62 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(
     }
     return values;
   }
+
+  public static Object[] readValuesFromBuffer(

Review comment:
       ```suggestion
     public static Object[] readTabletValuesFromBuffer(
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
##########
@@ -34,136 +31,69 @@
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.writelog.io.ILogReader;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 
 /**
- * LogReplayer finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the
- * WALs from the logNode and redoes them into a given MemTable and ModificationFile.
+ * This class helps redo wal logs into a TsFile. Notice: You should update time map in {@link
+ * TsFileResource} before using this class to avoid duplicated insertion and this class doesn't
+ * guarantee concurrency safety.
  */
-public class LogReplayer {
-
-  private Logger logger = LoggerFactory.getLogger(LogReplayer.class);
-  private String logNodePrefix;
-  private String insertFilePath;
-  private ModificationFile modFile;
-  private TsFileResource currentTsFileResource;
-  private IMemTable recoverMemTable;
-
-  // only unsequence file tolerates duplicated data
-  private boolean sequence;
-
-  private Map<String, Long> tempStartTimeMap = new HashMap<>();
-  private Map<String, Long> tempEndTimeMap = new HashMap<>();
-
-  public LogReplayer(
-      String logNodePrefix,
-      String insertFilePath,
-      ModificationFile modFile,
-      TsFileResource currentTsFileResource,
-      IMemTable memTable,
-      boolean sequence) {
-    this.logNodePrefix = logNodePrefix;
-    this.insertFilePath = insertFilePath;
-    this.modFile = modFile;
-    this.currentTsFileResource = currentTsFileResource;
-    this.recoverMemTable = memTable;
+public class TsFilePlanRedoer {
+  private static final Logger logger = LoggerFactory.getLogger(TsFilePlanRedoer.class);
+
+  private final TsFileResource tsFileResource;
+  /** only unsequence file tolerates duplicated data */
+  private final boolean sequence;
+  /** this TsFile's virtual storage group */
+  private final VirtualStorageGroupProcessor vsgProcessor;

Review comment:
       Change to IDTable

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils.listener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class helps judge whether some TsFile is recovered. */
+public class WALRecoverListener implements IResultListener {
+  private static final Logger logger = LoggerFactory.getLogger(WALRecoverListener.class);
+
+  /** path of recovering TsFile */
+  private final String filePath;
+
+  private volatile Status status;
+  private volatile Exception cause;
+
+  public WALRecoverListener(String filePath) {
+    this.filePath = filePath;
+    status = Status.RUNNING;
+    cause = null;
+  }
+
+  public synchronized WALRecoverListener succeed() {
+    status = Status.SUCCESS;
+    this.notifyAll();
+    return this;
+  }
+
+  public synchronized WALRecoverListener fail(Exception e) {
+    status = Status.FAILURE;
+    cause = e;
+    this.notifyAll();
+    return this;
+  }
+
+  public synchronized WALRecoverListener.Status getResult() {

Review comment:
       ```suggestion
     public synchronized Status getResult() {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
       this.filesToRecoverNum = filesToRecoverNum;
       this.recoveredFilesNum = recoveredFilesNum;

Review comment:
       ```suggestion
         this.recoveredFilesNum = 0;
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {

Review comment:
       ```suggestion
       public VSGRecoveryContext(long filesToRecoverNum) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/WALWriteUtils.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+/** Like {@link org.apache.iotdb.tsfile.utils.ReadWriteIOUtils} */
+public class WALWriteUtils {
+  public static final int BOOLEAN_LEN = ReadWriteIOUtils.BOOLEAN_LEN;
+  public static final int SHORT_LEN = ReadWriteIOUtils.SHORT_LEN;
+  public static final int INT_LEN = ReadWriteIOUtils.INT_LEN;
+  public static final int LONG_LEN = ReadWriteIOUtils.LONG_LEN;
+  public static final int DOUBLE_LEN = ReadWriteIOUtils.DOUBLE_LEN;
+  public static final int FLOAT_LEN = ReadWriteIOUtils.FLOAT_LEN;
+
+  private WALWriteUtils() {}
+
+  /** write a byte to byteBuffer according to flag. If flag is true, write 1, else write 0. */
+  public static int write(Boolean flag, IWALByteBufferView buffer) {
+    byte a;
+    if (Boolean.TRUE.equals(flag)) {
+      a = 1;
+    } else {
+      a = 0;
+    }
+
+    buffer.put(a);
+    return BOOLEAN_LEN;

Review comment:
       ```suggestion
       return Byte.BYTES;
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
##########
@@ -207,6 +235,15 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(
     return readValuesFromBuffer(buffer, dataTypes, columns, size);
   }
 
+  public static Object[] readValuesFromBuffer(

Review comment:
       ```suggestion
     public static Object[] readTabletValuesFromBuffer(
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
##########
@@ -23,15 +23,21 @@
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.buffer.WALEditValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class DeletePlan extends PhysicalPlan {
+public class DeletePlan extends PhysicalPlan implements WALEditValue {
+  private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Integer.BYTES + Long.BYTES * 3;

Review comment:
       add doc(field) for each number

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -522,17 +399,51 @@ private void recover() throws StorageGroupProcessorException {
 
       // split by partition so that we can find the last file of each partition and decide to
       // close it or not
-      RecoveryContext recoveryContext =
-          new RecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
+      VSGRecoveryContext VSGRecoveryContext =
+          new VSGRecoveryContext(tmpSeqTsFiles.size() + tmpUnseqTsFiles.size(), 0);
       Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
           splitResourcesByPartition(tmpSeqTsFiles);
       Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
           splitResourcesByPartition(tmpUnseqTsFiles);
+      // recover unsealed TsFiles
+      List<WALRecoverListener> recoverListeners = new ArrayList<>();
       for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
-        recoverTsFiles(value, recoveryContext, true);
+        if (!value.isEmpty()) {
+          TsFileResource unsealedTsFileResource = value.get(value.size() - 1);

Review comment:
       There may be more than one TsFiles are unSealed, do you resolve all of them?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/Checkpoint.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Checkpoint is the basic element of .checkpoint file, including type, number of memTables, and
+ * brief information of each memTable.
+ */
+public class Checkpoint implements SerializedSize {

Review comment:
       ```suggestion
   public class CheckpointEntry implements SerializedSize {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/TsFilePathUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import java.io.File;
+
+public class TsFilePathUtils {

Review comment:
       merge into TsFileUtils

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
##########
@@ -16,35 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.wal.node;
 
-package org.apache.iotdb.db.writelog;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-
-import java.io.IOException;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 
-public class WALFlushListener implements FlushListener {
+/** This interface provides uniform interface for writing wal and making checkpoints. */
+public interface IWALNode extends FlushListener, AutoCloseable {
+  /** Log InsertPlan */
+  WALFlushListener log(int memTableId, InsertPlan insertPlan);
 
-  private TsFileProcessor processor;
+  /** Log DeletePlan */
+  WALFlushListener log(int memTableId, DeletePlan deletePlan);

Review comment:
       Do we need to separate these two interfaces?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();

Review comment:
       Is it better to put this into WALNode?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * MemTableInfo records brief info of one memtable, including memTable id, tsFile path, and .wal
+ * file version id of its first {@link WALEdit}.
+ */
+public class MemTableInfo implements SerializedSize {
+  /** memTable id 4 bytes, tsFile path length 4 bytes, first version id 4 bytes */
+  private static final int FIXED_SERIALIZED_SIZE = Integer.BYTES * 2;

Review comment:
       maybe your doc is wrong

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -65,25 +65,44 @@ config_nodes=127.0.0.1:22277
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# fsync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.
+# Datatype: int
+# max_wal_node_num=0
 
-# When a certain amount of insert ahead log is reached, it will be flushed to disk
-# It is possible to lose at most flush_wal_threshold operations
+# Buffer size of each wal node
+# If it sets a value smaller than 0, use the default value 16777216 bytes (16MB).
 # Datatype: int
-# flush_wal_threshold=10000
+# wal_buffer_size_in_byte=16777216
 
-# The cycle when insert ahead log is periodically forced to be written to disk(in milliseconds)
-# If force_wal_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
-# Set this parameter to 0 may slow down the ingestion on slow disk.
-# Datatype: long
-# force_wal_period_in_ms=100
+# Buffer entry size of each wal buffer
+# If it sets a value smaller than 0, use the default value 16384 bytes (16KB).
+# Datatype: int
+# wal_buffer_entry_size_in_byte=16384
+
+# Max storage space for each wal node
+# Notice: If this parameter is too small, the write performance may decline.
+# Datatype: int
+# wal_node_max_storage_space_in_mb=3072

Review comment:
       This is a new machenism in 0.14, this shouldn't be a fixed value, better to set according to memory_for_write.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        folder = folder + File.separator + identifier;
+        // create new wal node
+        try {
+          selectedNode = new WALNode(identifier, folder);
+        } catch (FileNotFoundException e) {
+          logger.error("Fail to create wal node", e);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        walNodes.add(selectedNode);
+      } else {
+        // select next wal node by sequence order
+        nodeCursor++;

Review comment:
       if the nodeCursor exceeds MAX_WAL_NODE_NUM, reset to 0

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;

Review comment:
       add a parameter

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {

Review comment:
       ```suggestion
     public void onMemtableFlushStarted(IMemTable memTable) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {
+    // do nothing
+  }
+
+  @Override
+  public void onFlushEnd(IMemTable memTable) {

Review comment:
       ```suggestion
     public void onMemtableFlushed(IMemTable memTable) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;

Review comment:
       add a parameter

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
+  private int currentLogVersion = 0;

Review comment:
       ```suggestion
     private int currentCheckPointFileVersion = 0;
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -224,19 +222,20 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
       }
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        getLogNode().write(insertRowPlan);
-      } catch (Exception e) {
-        if (enableMemControl && memIncrements != null) {
-          rollbackMemoryInfo(memIncrements);
-        }
-        throw new WriteProcessException(
-            String.format(
-                "%s: %s write WAL failed",
-                storageGroupName, tsFileResource.getTsFile().getAbsolutePath()),
-            e);
+    try {
+      WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowPlan);
+      if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+        throw walFlushListener.getCause();
+      }
+    } catch (Exception e) {
+      if (enableMemControl && memIncrements != null) {

Review comment:
       ```suggestion
         if (enableMemControl) {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEdit.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * WALEdit is the basic element of .wal file, including type, memTable id, and specific
+ * value(physical plan or memTable snapshot).
+ */
+public class WALEdit implements SerializedSize {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** wal edit type 1 byte, memTable id 4 bytes */
+  private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Integer.BYTES;
+
+  /** type of value */
+  private final WALEditType type;
+  /** memTable id */
+  private final int memTableId;
+  /** value(physical plan or memTable snapshot) */
+  private final WALEditValue value;
+
+  /**
+   * listen whether this WALEdit has been written to the filesystem, null iff this WALEdit is
+   * deserialized from .wal file
+   */
+  private final WALFlushListener walFlushListener;
+
+  public WALEdit(int memTableId, WALEditValue value) {
+    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+  }
+
+  public WALEdit(int memTableId, WALEditValue value, boolean wait) {

Review comment:
       The WalMode is a global config. Do we need to pass it into each WALEdit, or use it directly in AbstractResultListener?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractWALBuffer implements IWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
+  /** use size limit to control WALEdit number in each file */
+  protected static final long LOG_SIZE_LIMIT = 10 * 1024 * 1024;
+
+  /** WALNode identifier of this buffer */
+  protected final String identifier;
+  /** directory to store .wal files */
+  protected final String logDirectory;
+  /** current wal file version id */
+  protected final AtomicInteger currentLogVersion = new AtomicInteger();
+  /** current wal file log writer */
+  protected volatile ILogWriter currentLogWriter;
+
+  public AbstractWALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new WALWriter(
+            SystemFileFactory.INSTANCE.getFile(
+                logDirectory, WALWriter.getLogFileName(currentLogVersion.get())));
+  }
+
+  @Override
+  public int getCurrentLogVersion() {
+    return currentLogVersion.get();
+  }
+
+  /** Notice: old log writer will be closed by this function. */
+  protected boolean tryRollingLogWriter() throws IOException {
+    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {

Review comment:
       Frequently call FileChannel.size() may time consuming, we could maintain the size by ourselves.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;
+  /** Maximum number of WALEdits in the blocking queue */
+  public static final int SIZE_LIMIT = 10_000;
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(SIZE_LIMIT);
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  // region these variables should be protected by buffersLock
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  // endregion
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+      idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+    } catch (OutOfMemoryError e) {
+      logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public void write(WALEdit edit) {
+    if (isClosed) {
+      logger.error(
+          "Fail to write WALEdit into wal node-{} because this node is closed.", identifier);
+      edit.getWalFlushListener().fail(new WALNodeClosedException(identifier));
+      return;
+    }
+    // only add this WALEdit to queue
+    try {
+      walEdits.put(edit);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted when waiting for adding WalEdit to buffer.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  // region Task of serializeThread
+  /** This task serializes WALEdit to workingBuffer and will call fsync at last. */
+  private class SerializeTask implements Runnable {
+    private final IWALByteBufferView byteBufferVew = new ByteBufferView();
+    private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+
+    @Override
+    public void run() {
+      try {
+        serialize();
+      } finally {
+        serializeThread.submit(new SerializeTask());
+      }
+    }
+
+    /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
+    private void serialize() {
+      // try to get first WALEdit with blocking interface
+      int batchSize = 0;
+      try {
+        WALEdit edit = walEdits.take();
+        try {
+          if (edit != CLOSE_SIGNAL) {
+            edit.serialize(byteBufferVew);
+            ++batchSize;
+            fsyncListeners.add(edit.getWalFlushListener());
+          }
+        } catch (Exception e) {
+          logger.error(
+              "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+          edit.getWalFlushListener().fail(e);
+        }
+      } catch (InterruptedException e) {
+        logger.warn(
+            "Interrupted when waiting for taking WALEdit from blocking queue to serialize.");
+        Thread.currentThread().interrupt();
+      }
+      // for better fsync performance, sleep a while to enlarge write batch
+      if (FSYNC_WAL_DELAY_IN_MS > 0 || config.getWalMode() == WALMode.ASYNC) {
+        long sleepTime = FSYNC_WAL_DELAY_IN_MS > 0 ? FSYNC_WAL_DELAY_IN_MS : ASYNC_WAL_DELAY_IN_MS;
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          Thread.currentThread().interrupt();
+        }
+      }
+      // try to get more WALEdits with non-blocking interface to enlarge write batch
+      // control batch size in sync mode to return quickly
+      int bachSizeLimit = config.getWalMode() == WALMode.SYNC ? SYNC_BATCH_SIZE_LIMIT : SIZE_LIMIT;
+      while (walEdits.peek() != null && batchSize < bachSizeLimit) {
+        WALEdit edit = walEdits.poll();
+        if (edit == null || edit == CLOSE_SIGNAL) {
+          break;
+        } else {
+          try {
+            edit.serialize(byteBufferVew);
+          } catch (Exception e) {
+            logger.error(
+                "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+            edit.getWalFlushListener().fail(e);
+            continue;
+          }
+          ++batchSize;
+          fsyncListeners.add(edit.getWalFlushListener());
+        }
+      }
+      // call fsync at last and set fsyncListeners
+      if (batchSize > 0) {
+        fsyncWorkingBuffer(fsyncListeners);
+      }
+    }
+  }
+
+  /**
+   * This view uses workingBuffer lock-freely because workingBuffer is only updated by
+   * serializeThread and this class is only used by serializeThread.
+   */
+  private class ByteBufferView implements IWALByteBufferView {
+    private void ensureEnoughSpace(int bytesNum) {
+      if (workingBuffer.remaining() < bytesNum) {
+        rollBuffer();
+      }
+    }
+
+    private void rollBuffer() {
+      syncWorkingBuffer();
+    }
+
+    @Override
+    public void put(byte b) {
+      ensureEnoughSpace(Byte.BYTES);
+      workingBuffer.put(b);
+    }
+
+    @Override
+    public void put(byte[] src) {
+      int offset = 0;
+      while (true) {
+        int leftCapacity = workingBuffer.remaining();
+        int needCapacity = src.length - offset;
+        if (leftCapacity >= needCapacity) {
+          workingBuffer.put(src, offset, needCapacity);
+          break;
+        } else {
+          workingBuffer.put(src, offset, leftCapacity);
+          offset += leftCapacity;
+          rollBuffer();
+        }
+      }
+    }
+
+    @Override
+    public void putChar(char value) {
+      ensureEnoughSpace(Character.BYTES);
+      workingBuffer.putChar(value);
+    }
+
+    @Override
+    public void putShort(short value) {
+      ensureEnoughSpace(Short.BYTES);
+      workingBuffer.putShort(value);
+    }
+
+    @Override
+    public void putInt(int value) {
+      ensureEnoughSpace(Integer.BYTES);
+      workingBuffer.putInt(value);
+    }
+
+    @Override
+    public void putLong(long value) {
+      ensureEnoughSpace(Long.BYTES);
+      workingBuffer.putLong(value);
+    }
+
+    @Override
+    public void putFloat(float value) {
+      ensureEnoughSpace(Float.BYTES);
+      workingBuffer.putFloat(value);
+    }
+
+    @Override
+    public void putDouble(double value) {
+      ensureEnoughSpace(Double.BYTES);
+      workingBuffer.putDouble(value);
+    }
+  }
+
+  /** Notice: this method only called when buffer is exhausted by SerializeTask. */
+  private void syncWorkingBuffer() {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(false));
+  }
+
+  /** Notice: this method only called at the last of SerializeTask. */
+  private void fsyncWorkingBuffer(List<WALFlushListener> fsyncListeners) {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners));
+  }
+
+  // only called by serializeThread
+  private void switchIdleBufferToWorking() {

Review comment:
       ```suggestion
     private void switchWorkingBufferToFlushing() {
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;
+  /** Maximum number of WALEdits in the blocking queue */
+  public static final int SIZE_LIMIT = 10_000;

Review comment:
       add a parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839337665



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839355561



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);

Review comment:
       No need to minus nodeIdCounter here, this can be a trick to help us locate wal node allocated error.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        folder = folder + File.separator + identifier;
+        // create new wal node
+        try {
+          selectedNode = new WALNode(identifier, folder);
+        } catch (FileNotFoundException e) {
+          logger.error("Fail to create wal node", e);

Review comment:
       No need to minus nodeIdCounter here, this can be a trick to help us locate wal node allocated error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839353232



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NODE_NUM =
+      config.getMaxWalNodeNum() > 0 ? config.getMaxWalNodeNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  // region these variables should be protected by nodesLock
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;
+  // endregion
+  /** single thread to fsync .checkpoint files */
+  private ScheduledExecutorService checkpointThread;
+  /** single thread to delete old .wal files */
+  private ScheduledExecutorService walDeleteThread;
+
+  private WALManager() {}
+
+  /** Apply for a wal node */
+  public IWALNode applyForWALNode() {
+    if (config.getWalMode() == WALMode.DISABLE) {
+      return WALFakeNode.getSuccessInstance();
+    }
+
+    WALNode selectedNode;
+    nodesLock.lock();
+    try {
+      if (walNodes.size() < MAX_WAL_NODE_NUM) {
+        nodeIdCounter++;
+        String identifier = String.valueOf(nodeIdCounter);
+        String folder;
+        // get wal folder
+        try {
+          folder = folderManager.getNextFolder();
+        } catch (DiskSpaceInsufficientException e) {
+          logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+          config.setReadOnly(true);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        folder = folder + File.separator + identifier;
+        // create new wal node
+        try {
+          selectedNode = new WALNode(identifier, folder);
+        } catch (FileNotFoundException e) {
+          logger.error("Fail to create wal node", e);
+          return WALFakeNode.getFailureInstance(e);
+        }
+        walNodes.add(selectedNode);
+      } else {
+        // select next wal node by sequence order
+        nodeCursor++;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834170367



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  /** Maximum number of WALEdits in one serialize task */
+  public static final int BATCH_SIZE_LIMIT = 100;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long SYNC_WAL_DELAY_IN_MS = config.getSyncWalDelayInMs();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(BATCH_SIZE_LIMIT * 10);
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);

Review comment:
       The memory usage is controlled by the max number of wal node (each node has a wal buffer), whose corresponding parameter is [max_wal_num](https://github.com/apache/iotdb/pull/5320/files#diff-057cb21694ee5d5cf32fbd4e7afc416c5841a3696dcce0883d783a961076ba9cR66-R69). You can see wal nodes are reused in [WALManger](https://github.com/apache/iotdb/pull/5320/files#diff-672f4b0027b7b2d89495f695447243e85d264fc61a37ee2dcbbcf01a3f59cf0aR108-R112).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839347700



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
##########
@@ -226,15 +214,18 @@ protected void serializeImpl(ByteBuffer buffer) {
   }
 
   /**
-   * Deserialize the plan from the given buffer. This is provided for WAL, and must be used with
-   * serializeToWAL.
+   * Deserialize the plan from the given buffer.
    *
    * @param buffer
    */
   public void deserialize(ByteBuffer buffer) throws IllegalPathException, IOException {
     throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
   }
 
+  protected int getSerializedBytesNum(String value) {
+    return value == null ? Integer.BYTES : Integer.BYTES + value.getBytes().length;

Review comment:
       It's old code, you can see current code in [ReadWriteIOUtils#sizeToWrite](https://github.com/apache/iotdb/pull/5320/files#diff-20f7ca391a64e16b04261240db9b363820d3c4b5afd43b2779d3b718e54b0fa6R459-R464)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839343448



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -340,6 +338,16 @@ public void insertTablet(
     tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex());
   }
 
+  private void createNewWorkingMemTable() throws WriteProcessException {
+    if (enableMemControl) {
+      workMemTable = new PrimitiveMemTable(enableMemControl);
+      MemTableManager.getInstance().addMemtableNumber();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839360671



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractWALBuffer implements IWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
+  /** use size limit to control WALEdit number in each file */
+  protected static final long LOG_SIZE_LIMIT = 10 * 1024 * 1024;
+
+  /** WALNode identifier of this buffer */
+  protected final String identifier;
+  /** directory to store .wal files */
+  protected final String logDirectory;
+  /** current wal file version id */
+  protected final AtomicInteger currentLogVersion = new AtomicInteger();
+  /** current wal file log writer */
+  protected volatile ILogWriter currentLogWriter;
+
+  public AbstractWALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new WALWriter(
+            SystemFileFactory.INSTANCE.getFile(
+                logDirectory, WALWriter.getLogFileName(currentLogVersion.get())));
+  }
+
+  @Override
+  public int getCurrentLogVersion() {
+    return currentLogVersion.get();
+  }
+
+  /** Notice: old log writer will be closed by this function. */
+  protected boolean tryRollingLogWriter() throws IOException {
+    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {

Review comment:
       It may not time-comsuing, see [details](https://stackoverflow.com/questions/41595688/is-java-nio-channels-filechannel-size-fast-for-big-files)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834165668



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NUM =
+      config.getMaxWalNum() > 0 ? config.getMaxWalNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;

Review comment:
       nodeCursor and nodeIdCounter are both protected by the [nodesLock](https://github.com/apache/iotdb/pull/5320/files#diff-672f4b0027b7b2d89495f695447243e85d264fc61a37ee2dcbbcf01a3f59cf0aR63-R64) of WALManager.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#issuecomment-1076069705


   > do not have permission to open the document. advised to put this doc to jira or confluence.
   
   I have updated the read authority. It can be seen now. @wangchao316 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HTHou commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r836248972



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
##########
@@ -33,7 +34,7 @@
 import java.util.List;
 import java.util.Set;
 
-public abstract class InsertPlan extends PhysicalPlan {
+public abstract class InsertPlan extends PhysicalPlan implements WALEditValue {

Review comment:
       Why implements WALEditValue?

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -85,7 +88,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @SuppressWarnings("java:S1135") // ignore todos
-public class TsFileProcessor {
+public class TsFileProcessor implements WALSubmitter {

Review comment:
       Why TsFileProcessor implements WALSubmitter? It seems so weird.. 

##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
##########
@@ -280,6 +295,21 @@ public void serializeImpl(ByteBuffer buffer) {
     }
   }
 
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    int type = PhysicalPlanType.MULTI_BATCH_INSERT.ordinal();
+    buffer.put((byte) type);
+    buffer.putInt(insertTabletPlanList.size());
+    for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+      insertTabletPlan.subSerialize(buffer);
+    }
+
+    buffer.putInt(parentInsertTabletPlanIndexList.size());
+    for (Integer index : parentInsertTabletPlanIndexList) {
+      buffer.putInt(index);
+    }
+  }
+

Review comment:
       Any differences with `serializeImpl`? Add some Javadoc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834160450



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.
+# Datatype: int
+# max_wal_num=0
 
-# When a certain amount of insert ahead log is reached, it will be flushed to disk
-# It is possible to lose at most flush_wal_threshold operations
+# Buffer size of each wal node
+# If it sets a value smaller than 0, use the default value 16777216 bytes (16MB).
 # Datatype: int
-# flush_wal_threshold=10000
+# wal_buffer_size_in_byte=16777216
 
-# The cycle when insert ahead log is periodically forced to be written to disk(in milliseconds)
-# If force_wal_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
-# Set this parameter to 0 may slow down the ingestion on slow disk.
-# Datatype: long
-# force_wal_period_in_ms=100
+# Buffer entry size of each wal buffer
+# If it sets a value smaller than 0, use the default value 16384 bytes (16KB).
+# Datatype: int
+# wal_buffer_entry_size_in_byte=16384
+
+# Max storage space for wal
+# The default value 0 means the storage space will not be controlled.
+# Notice: If this parameter is too small, the write performance may decline.
+# Datatype: int
+# wal_storage_space_in_mb=0

Review comment:
       Default 0 means the storage space will controlled by iotdb itself, you can see [delete method](https://github.com/apache/iotdb/pull/5320/files#diff-672f4b0027b7b2d89495f695447243e85d264fc61a37ee2dcbbcf01a3f59cf0aR166-R181) of WALManager. Besides, related functions of this parameter is still under development.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834162580



##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -139,46 +142,39 @@
 
   /** When inserting rejected exceeds this, throw an exception. Unit: millisecond */
   private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
-  /** Is the write ahead log enable. */
-  private boolean enableWal = true;
 
-  private volatile boolean readOnly = false;
+  /** this variable set timestamp precision as millisecond, microsecond or nanosecond */
+  private String timestampPrecision = "ms";
 
-  private boolean enableDiscardOutOfOrderData = false;
+  // region Write Ahead Log Configuration

Review comment:
       Just remove it from the region of wal parameters, you can still see it in [IoTDBConfig L777-L778](https://github.com/apache/iotdb/pull/5320/files#diff-4523da5aacb813d25cbf8487e53171d423aafaa0dd000c85047e103b9cd0b3afR777-R778)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839349780



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.recover.file;
+
+import org.apache.iotdb.db.engine.flush.MemTableFlushTask;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
+import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.exception.WALRecoverException;
+import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/**
+ * This class is used to help recover all unsealed TsFiles at zero level. There are 3 main
+ * procedures: start recovery, redo logs, and end recovery, you must call them in order. Notice:
+ * This class doesn't guarantee concurrency safety.
+ */
+public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerformer {
+  private static final Logger logger =
+      LoggerFactory.getLogger(UnsealedTsFileRecoverPerformer.class);
+
+  /** sequence file or not */
+  private final boolean sequence;
+  /** add recovered TsFile back to virtual storage group */
+  private final Consumer<UnsealedTsFileRecoverPerformer> callbackAfterUnsealedTsFileRecovered;
+  /** redo wal log to recover TsFile */
+  private final TsFilePlanRedoer walRedoer;
+  /** trace result of this recovery */
+  private final WALRecoverListener recoverListener;
+
+  public UnsealedTsFileRecoverPerformer(
+      TsFileResource tsFileResource,
+      boolean sequence,
+      VirtualStorageGroupProcessor vsgProcessor,
+      Consumer<UnsealedTsFileRecoverPerformer> callbackAfterUnsealedTsFileRecovered) {
+    super(tsFileResource);
+    this.sequence = sequence;
+    this.callbackAfterUnsealedTsFileRecovered = callbackAfterUnsealedTsFileRecovered;
+    this.walRedoer = new TsFilePlanRedoer(tsFileResource, sequence, vsgProcessor);
+    this.recoverListener = new WALRecoverListener(tsFileResource.getTsFilePath());
+  }
+
+  /**
+   * Make preparation for recovery, including load .resource file (reconstruct when necessary) and
+   * truncate the file to remaining corrected data.
+   */
+  public void startRecovery() throws StorageGroupProcessorException, IOException {
+    super.recoverWithWriter();
+
+    if (hasCrashed()) {
+      // tsfile has crashed due to failure,
+      // the last ChunkGroup may contain the same data as the WALs,
+      // so the time map must be updated first to avoid duplicated insertion
+      loadResourceFromWriter();
+    }
+  }
+
+  private void loadResourceFromWriter() {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
##########
@@ -34,136 +31,69 @@
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.db.writelog.io.ILogReader;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 
 /**
- * LogReplayer finds the logNode of the TsFile given by insertFilePath and logNodePrefix, reads the
- * WALs from the logNode and redoes them into a given MemTable and ModificationFile.
+ * This class helps redo wal logs into a TsFile. Notice: You should update time map in {@link
+ * TsFileResource} before using this class to avoid duplicated insertion and this class doesn't
+ * guarantee concurrency safety.
  */
-public class LogReplayer {
-
-  private Logger logger = LoggerFactory.getLogger(LogReplayer.class);
-  private String logNodePrefix;
-  private String insertFilePath;
-  private ModificationFile modFile;
-  private TsFileResource currentTsFileResource;
-  private IMemTable recoverMemTable;
-
-  // only unsequence file tolerates duplicated data
-  private boolean sequence;
-
-  private Map<String, Long> tempStartTimeMap = new HashMap<>();
-  private Map<String, Long> tempEndTimeMap = new HashMap<>();
-
-  public LogReplayer(
-      String logNodePrefix,
-      String insertFilePath,
-      ModificationFile modFile,
-      TsFileResource currentTsFileResource,
-      IMemTable memTable,
-      boolean sequence) {
-    this.logNodePrefix = logNodePrefix;
-    this.insertFilePath = insertFilePath;
-    this.modFile = modFile;
-    this.currentTsFileResource = currentTsFileResource;
-    this.recoverMemTable = memTable;
+public class TsFilePlanRedoer {
+  private static final Logger logger = LoggerFactory.getLogger(TsFilePlanRedoer.class);
+
+  private final TsFileResource tsFileResource;
+  /** only unsequence file tolerates duplicated data */
+  private final boolean sequence;
+  /** this TsFile's virtual storage group */
+  private final VirtualStorageGroupProcessor vsgProcessor;

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -455,7 +332,7 @@ public void setReady(boolean ready) {
   }
 
   /** this class is used to store recovering context */
-  private class RecoveryContext {
+  private class VSGRecoveryContext {
     /** number of files to be recovered */
     private final long filesToRecoverNum;

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
##########
@@ -467,7 +344,7 @@ public void setReady(boolean ready) {
     /** last recovery log files num */
     private long lastLogCheckFilesNum;
 
-    public RecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
+    public VSGRecoveryContext(long filesToRecoverNum, long recoveredFilesNum) {
       this.filesToRecoverNum = filesToRecoverNum;
       this.recoveredFilesNum = recoveredFilesNum;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] qiaojialin commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r841042138



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEditValue.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+/** A class implements this interface can be written into .wal file. */
+public interface WALEditValue extends SerializedSize {

Review comment:
       yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834175008



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  /** Maximum number of WALEdits in one serialize task */
+  public static final int BATCH_SIZE_LIMIT = 100;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long SYNC_WAL_DELAY_IN_MS = config.getSyncWalDelayInMs();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(BATCH_SIZE_LIMIT * 10);
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+      idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+    } catch (OutOfMemoryError e) {
+      logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public void write(WALEdit edit) {
+    if (isClosed) {
+      logger.error(
+          "Fail to write WALEdit into wal node-{} because this node is closed.", identifier);
+      edit.getWalFlushListener().fail(new WALNodeClosedException(identifier));
+      return;
+    }
+    // only add this WALEdit to queue
+    try {
+      walEdits.put(edit);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted when waiting for adding WalEdit to buffer.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  // region Task of serializeThread
+  /** This task serializes WALEdit to workingBuffer and will call fsync at last. */
+  private class SerializeTask implements Runnable {
+    private final IWALByteBufferView byteBufferVew = new ByteBufferView();
+    private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+
+    @Override
+    public void run() {
+      try {
+        serialize();
+      } finally {
+        serializeThread.submit(new SerializeTask());
+      }
+    }
+
+    /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
+    private void serialize() {
+      // for better fsync performance, sleep a while to enlarge write batch
+      if (SYNC_WAL_DELAY_IN_MS > 0) {
+        try {
+          Thread.sleep(SYNC_WAL_DELAY_IN_MS);
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          Thread.currentThread().interrupt();
+        }
+      }
+      // try to get first WALEdit with blocking interface
+      int batchSize = 0;
+      try {
+        WALEdit edit = walEdits.take();
+        try {
+          if (edit != CLOSE_SIGNAL) {
+            edit.serialize(byteBufferVew);
+            ++batchSize;
+            fsyncListeners.add(edit.getWalFlushListener());
+          }
+        } catch (Exception e) {
+          logger.error(
+              "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+          edit.getWalFlushListener().fail(e);
+        }
+      } catch (InterruptedException e) {
+        logger.warn(
+            "Interrupted when waiting for taking WALEdit from blocking queue to serialize.");
+        Thread.currentThread().interrupt();
+      }
+      // try to get more WALEdits with non-blocking interface to enlarge write batch
+      while (walEdits.peek() != null && batchSize < BATCH_SIZE_LIMIT) {
+        WALEdit edit = walEdits.poll();
+        if (edit == null || edit == CLOSE_SIGNAL) {
+          break;
+        } else {
+          try {
+            edit.serialize(byteBufferVew);
+          } catch (Exception e) {
+            logger.error(
+                "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+            edit.getWalFlushListener().fail(e);
+            continue;
+          }
+          ++batchSize;
+          fsyncListeners.add(edit.getWalFlushListener());
+        }
+      }
+      // call fsync at last and set fsyncListeners
+      if (batchSize > 0) {
+        fsyncWorkingBuffer(fsyncListeners);
+      }
+    }
+  }
+
+  /**
+   * This view uses workingBuffer lock-freely because workingBuffer is only updated by
+   * serializeThread and this class is only used by serializeThread.
+   */
+  private class ByteBufferView implements IWALByteBufferView {
+    private void ensureEnoughSpace(int bytesNum) {
+      if (workingBuffer.remaining() < bytesNum) {
+        rollBuffer();
+      }
+    }
+
+    private void rollBuffer() {
+      syncWorkingBuffer();
+    }
+
+    @Override
+    public void put(byte b) {
+      ensureEnoughSpace(Byte.BYTES);
+      workingBuffer.put(b);
+    }
+
+    @Override
+    public void put(byte[] src) {
+      int offset = 0;
+      while (true) {
+        int leftCapacity = workingBuffer.remaining();
+        int needCapacity = src.length - offset;
+        if (leftCapacity >= needCapacity) {
+          workingBuffer.put(src, offset, needCapacity);
+          break;
+        } else {
+          workingBuffer.put(src, offset, leftCapacity);
+          offset += leftCapacity;
+          rollBuffer();
+        }
+      }
+    }
+
+    @Override
+    public void putChar(char value) {
+      ensureEnoughSpace(Character.BYTES);
+      workingBuffer.putChar(value);
+    }
+
+    @Override
+    public void putShort(short value) {
+      ensureEnoughSpace(Short.BYTES);
+      workingBuffer.putShort(value);
+    }
+
+    @Override
+    public void putInt(int value) {
+      ensureEnoughSpace(Integer.BYTES);
+      workingBuffer.putInt(value);
+    }
+
+    @Override
+    public void putLong(long value) {
+      ensureEnoughSpace(Long.BYTES);
+      workingBuffer.putLong(value);
+    }
+
+    @Override
+    public void putFloat(float value) {
+      ensureEnoughSpace(Float.BYTES);
+      workingBuffer.putFloat(value);
+    }
+
+    @Override
+    public void putDouble(double value) {
+      ensureEnoughSpace(Double.BYTES);
+      workingBuffer.putDouble(value);
+    }
+  }
+
+  /** Notice: this method only called when buffer is exhausted by SerializeTask. */
+  private void syncWorkingBuffer() {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(false));
+  }
+
+  /** Notice: this method only called at the last of SerializeTask. */
+  private void fsyncWorkingBuffer(List<WALFlushListener> fsyncListeners) {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners));
+  }
+
+  // only called by serializeThread
+  private void switchIdleBufferToWorking() {
+    buffersLock.lock();
+    try {
+      while (idleBuffer == null) {
+        idleBufferReadyCondition.await();
+      }
+      syncingBuffer = workingBuffer;
+      workingBuffer = idleBuffer;
+      workingBuffer.clear();
+      idleBuffer = null;
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted When waiting for available working buffer.");
+      Thread.currentThread().interrupt();
+    } finally {
+      buffersLock.unlock();
+    }
+  }
+  // endregion

Review comment:
       Get lock at the [beginning of this method](https://github.com/apache/iotdb/pull/5320/files#diff-867c4e44ff59fdd079abe7337e64b9774fd55bffb89ec743bb42db0e2f7fab77R283-R285), so it's unsafe to delete lock release code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r836338739



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
##########
@@ -280,6 +295,21 @@ public void serializeImpl(ByteBuffer buffer) {
     }
   }
 
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    int type = PhysicalPlanType.MULTI_BATCH_INSERT.ordinal();
+    buffer.put((byte) type);
+    buffer.putInt(insertTabletPlanList.size());
+    for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+      insertTabletPlan.subSerialize(buffer);
+    }
+
+    buffer.putInt(parentInsertTabletPlanIndexList.size());
+    for (Integer index : parentInsertTabletPlanIndexList) {
+      buffer.putInt(index);
+    }
+  }
+

Review comment:
       The code logic are same, but IWALByteBufferView encapsulates some actions to deal with BufferOverflowException.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HTHou commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r836969022



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
##########
@@ -280,6 +295,21 @@ public void serializeImpl(ByteBuffer buffer) {
     }
   }
 
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    int type = PhysicalPlanType.MULTI_BATCH_INSERT.ordinal();
+    buffer.put((byte) type);
+    buffer.putInt(insertTabletPlanList.size());
+    for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+      insertTabletPlan.subSerialize(buffer);
+    }
+
+    buffer.putInt(parentInsertTabletPlanIndexList.size());
+    for (Integer index : parentInsertTabletPlanIndexList) {
+      buffer.putInt(index);
+    }
+  }
+

Review comment:
       It'll be better to add some javadoc about it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839344930



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java
##########
@@ -23,15 +23,21 @@
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.wal.buffer.WALEditValue;
+import org.apache.iotdb.db.wal.utils.WALWriteUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class DeletePlan extends PhysicalPlan {
+public class DeletePlan extends PhysicalPlan implements WALEditValue {
+  private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Integer.BYTES + Long.BYTES * 3;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834171945



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractWALBuffer implements IWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
+  /** use size limit to control WALEdit number in each file */
+  protected static final long LOG_SIZE_LIMIT = 10 * 1024 * 1024;
+
+  /** WALNode identifier of this buffer */
+  protected final String identifier;
+  /** directory to store .wal files */
+  protected final String logDirectory;
+  /** current wal file version id */
+  protected final AtomicInteger currentLogVersion = new AtomicInteger();
+  /** current wal file log writer */
+  protected volatile ILogWriter currentLogWriter;
+
+  public AbstractWALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new WALWriter(

Review comment:
       Because wal segemented buffer, another type of buffer, can reuse these code. And WALSegementedBuffer is still under development.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834292710



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractWALBuffer implements IWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
+  /** use size limit to control WALEdit number in each file */
+  protected static final long LOG_SIZE_LIMIT = 10 * 1024 * 1024;
+
+  /** WALNode identifier of this buffer */
+  protected final String identifier;
+  /** directory to store .wal files */
+  protected final String logDirectory;
+  /** current wal file version id */
+  protected final AtomicInteger currentLogVersion = new AtomicInteger();
+  /** current wal file log writer */
+  protected volatile ILogWriter currentLogWriter;
+
+  public AbstractWALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new WALWriter(

Review comment:
       Sorry, i misunderstand you. The super class of WALWriter is LogWriter, not AbstractWALBuffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839356089



##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -65,25 +65,44 @@ config_nodes=127.0.0.1:22277
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# fsync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.
+# Datatype: int
+# max_wal_node_num=0
 
-# When a certain amount of insert ahead log is reached, it will be flushed to disk
-# It is possible to lose at most flush_wal_threshold operations
+# Buffer size of each wal node
+# If it sets a value smaller than 0, use the default value 16777216 bytes (16MB).
 # Datatype: int
-# flush_wal_threshold=10000
+# wal_buffer_size_in_byte=16777216
 
-# The cycle when insert ahead log is periodically forced to be written to disk(in milliseconds)
-# If force_wal_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
-# Set this parameter to 0 may slow down the ingestion on slow disk.
-# Datatype: long
-# force_wal_period_in_ms=100
+# Buffer entry size of each wal buffer
+# If it sets a value smaller than 0, use the default value 16384 bytes (16KB).
+# Datatype: int
+# wal_buffer_entry_size_in_byte=16384
+
+# Max storage space for each wal node
+# Notice: If this parameter is too small, the write performance may decline.
+# Datatype: int
+# wal_node_max_storage_space_in_mb=3072

Review comment:
       This parameter is used to control disk usage, not memory usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839360129



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEdit.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.memtable.AbstractMemTable;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * WALEdit is the basic element of .wal file, including type, memTable id, and specific
+ * value(physical plan or memTable snapshot).
+ */
+public class WALEdit implements SerializedSize {
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** wal edit type 1 byte, memTable id 4 bytes */
+  private static final int FIXED_SERIALIZED_SIZE = Byte.BYTES + Integer.BYTES;
+
+  /** type of value */
+  private final WALEditType type;
+  /** memTable id */
+  private final int memTableId;
+  /** value(physical plan or memTable snapshot) */
+  private final WALEditValue value;
+
+  /**
+   * listen whether this WALEdit has been written to the filesystem, null iff this WALEdit is
+   * deserialized from .wal file
+   */
+  private final WALFlushListener walFlushListener;
+
+  public WALEdit(int memTableId, WALEditValue value) {
+    this(memTableId, value, config.getWalMode() == WALMode.SYNC);
+  }
+
+  public WALEdit(int memTableId, WALEditValue value, boolean wait) {

Review comment:
       For WALEdit, it also has another constuctor to construct a waiting WALResultListener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r836337312



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -85,7 +88,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @SuppressWarnings("java:S1135") // ignore todos
-public class TsFileProcessor {
+public class TsFileProcessor implements WALSubmitter {

Review comment:
       Already deleted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839345096



##########
File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
##########
@@ -270,4 +307,62 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(
     }
     return values;
   }
+
+  public static Object[] readValuesFromBuffer(

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
##########
@@ -207,6 +235,15 @@ public static TSQueryDataSet convertQueryDataSetByFetchSize(
     return readValuesFromBuffer(buffer, dataTypes, columns, size);
   }
 
+  public static Object[] readValuesFromBuffer(

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+/** Currently, there are 2 buffer types */

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/listener/WALRecoverListener.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils.listener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class helps judge whether some TsFile is recovered. */
+public class WALRecoverListener implements IResultListener {
+  private static final Logger logger = LoggerFactory.getLogger(WALRecoverListener.class);
+
+  /** path of recovering TsFile */
+  private final String filePath;
+
+  private volatile Status status;
+  private volatile Exception cause;
+
+  public WALRecoverListener(String filePath) {
+    this.filePath = filePath;
+    status = Status.RUNNING;
+    cause = null;
+  }
+
+  public synchronized WALRecoverListener succeed() {
+    status = Status.SUCCESS;
+    this.notifyAll();
+    return this;
+  }
+
+  public synchronized WALRecoverListener fail(Exception e) {
+    status = Status.FAILURE;
+    cause = e;
+    this.notifyAll();
+    return this;
+  }
+
+  public synchronized WALRecoverListener.Status getResult() {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839360788



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;
+  /** Maximum number of WALEdits in the blocking queue */
+  public static final int SIZE_LIMIT = 10_000;
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(SIZE_LIMIT);
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  // region these variables should be protected by buffersLock
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  // endregion
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+      idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+    } catch (OutOfMemoryError e) {
+      logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public void write(WALEdit edit) {
+    if (isClosed) {
+      logger.error(
+          "Fail to write WALEdit into wal node-{} because this node is closed.", identifier);
+      edit.getWalFlushListener().fail(new WALNodeClosedException(identifier));
+      return;
+    }
+    // only add this WALEdit to queue
+    try {
+      walEdits.put(edit);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted when waiting for adding WalEdit to buffer.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  // region Task of serializeThread
+  /** This task serializes WALEdit to workingBuffer and will call fsync at last. */
+  private class SerializeTask implements Runnable {
+    private final IWALByteBufferView byteBufferVew = new ByteBufferView();
+    private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+
+    @Override
+    public void run() {
+      try {
+        serialize();
+      } finally {
+        serializeThread.submit(new SerializeTask());
+      }
+    }
+
+    /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
+    private void serialize() {
+      // try to get first WALEdit with blocking interface
+      int batchSize = 0;
+      try {
+        WALEdit edit = walEdits.take();
+        try {
+          if (edit != CLOSE_SIGNAL) {
+            edit.serialize(byteBufferVew);
+            ++batchSize;
+            fsyncListeners.add(edit.getWalFlushListener());
+          }
+        } catch (Exception e) {
+          logger.error(
+              "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+          edit.getWalFlushListener().fail(e);
+        }
+      } catch (InterruptedException e) {
+        logger.warn(
+            "Interrupted when waiting for taking WALEdit from blocking queue to serialize.");
+        Thread.currentThread().interrupt();
+      }
+      // for better fsync performance, sleep a while to enlarge write batch
+      if (FSYNC_WAL_DELAY_IN_MS > 0 || config.getWalMode() == WALMode.ASYNC) {
+        long sleepTime = FSYNC_WAL_DELAY_IN_MS > 0 ? FSYNC_WAL_DELAY_IN_MS : ASYNC_WAL_DELAY_IN_MS;
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          Thread.currentThread().interrupt();
+        }
+      }
+      // try to get more WALEdits with non-blocking interface to enlarge write batch
+      // control batch size in sync mode to return quickly
+      int bachSizeLimit = config.getWalMode() == WALMode.SYNC ? SYNC_BATCH_SIZE_LIMIT : SIZE_LIMIT;
+      while (walEdits.peek() != null && batchSize < bachSizeLimit) {
+        WALEdit edit = walEdits.poll();
+        if (edit == null || edit == CLOSE_SIGNAL) {
+          break;
+        } else {
+          try {
+            edit.serialize(byteBufferVew);
+          } catch (Exception e) {
+            logger.error(
+                "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+            edit.getWalFlushListener().fail(e);
+            continue;
+          }
+          ++batchSize;
+          fsyncListeners.add(edit.getWalFlushListener());
+        }
+      }
+      // call fsync at last and set fsyncListeners
+      if (batchSize > 0) {
+        fsyncWorkingBuffer(fsyncListeners);
+      }
+    }
+  }
+
+  /**
+   * This view uses workingBuffer lock-freely because workingBuffer is only updated by
+   * serializeThread and this class is only used by serializeThread.
+   */
+  private class ByteBufferView implements IWALByteBufferView {
+    private void ensureEnoughSpace(int bytesNum) {
+      if (workingBuffer.remaining() < bytesNum) {
+        rollBuffer();
+      }
+    }
+
+    private void rollBuffer() {
+      syncWorkingBuffer();
+    }
+
+    @Override
+    public void put(byte b) {
+      ensureEnoughSpace(Byte.BYTES);
+      workingBuffer.put(b);
+    }
+
+    @Override
+    public void put(byte[] src) {
+      int offset = 0;
+      while (true) {
+        int leftCapacity = workingBuffer.remaining();
+        int needCapacity = src.length - offset;
+        if (leftCapacity >= needCapacity) {
+          workingBuffer.put(src, offset, needCapacity);
+          break;
+        } else {
+          workingBuffer.put(src, offset, leftCapacity);
+          offset += leftCapacity;
+          rollBuffer();
+        }
+      }
+    }
+
+    @Override
+    public void putChar(char value) {
+      ensureEnoughSpace(Character.BYTES);
+      workingBuffer.putChar(value);
+    }
+
+    @Override
+    public void putShort(short value) {
+      ensureEnoughSpace(Short.BYTES);
+      workingBuffer.putShort(value);
+    }
+
+    @Override
+    public void putInt(int value) {
+      ensureEnoughSpace(Integer.BYTES);
+      workingBuffer.putInt(value);
+    }
+
+    @Override
+    public void putLong(long value) {
+      ensureEnoughSpace(Long.BYTES);
+      workingBuffer.putLong(value);
+    }
+
+    @Override
+    public void putFloat(float value) {
+      ensureEnoughSpace(Float.BYTES);
+      workingBuffer.putFloat(value);
+    }
+
+    @Override
+    public void putDouble(double value) {
+      ensureEnoughSpace(Double.BYTES);
+      workingBuffer.putDouble(value);
+    }
+  }
+
+  /** Notice: this method only called when buffer is exhausted by SerializeTask. */
+  private void syncWorkingBuffer() {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(false));
+  }
+
+  /** Notice: this method only called at the last of SerializeTask. */
+  private void fsyncWorkingBuffer(List<WALFlushListener> fsyncListeners) {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners));
+  }
+
+  // only called by serializeThread
+  private void switchIdleBufferToWorking() {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;
+  /** Maximum number of WALEdits in the blocking queue */
+  public static final int SIZE_LIMIT = 10_000;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839357502



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {
+    // do nothing
+  }
+
+  @Override
+  public void onFlushEnd(IMemTable memTable) {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.wal.buffer.IWALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
+import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.TsFilePathUtils;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+public class WALNode implements IWALNode {
+  public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+
+  private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long MAX_STORAGE_SPACE_IN_BYTE =
+      config.getWalNodeMaxStorageSpaceInMb() * 1024 * 1024;
+  private static final long MEM_TABLE_SNAPSHOT_THRESHOLD_IN_BYTE =
+      config.getWalMemTableSnapshotThreshold();
+
+  /** unique identifier of this WALNode */
+  private final String identifier;
+  /** directory to store this node's files */
+  private final String logDirectory;
+  /** wal buffer */
+  private final IWALBuffer buffer;
+  /** manage checkpoints */
+  private final CheckpointManager checkpointManager;
+
+  public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
+    }
+    this.buffer = new WALBuffer(identifier, logDirectory);
+    this.checkpointManager = new CheckpointManager(identifier, logDirectory);
+  }
+
+  /** Return true when this folder wal node folder */
+  public static boolean walNodeFolderNameFilter(File dir, String name) {
+    return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, InsertPlan insertPlan) {
+    WALEdit walEdit = new WALEdit(memTableId, insertPlan);
+    return log(walEdit);
+  }
+
+  @Override
+  public WALFlushListener log(int memTableId, DeletePlan deletePlan) {
+    WALEdit walEdit = new WALEdit(memTableId, deletePlan);
+    return log(walEdit);
+  }
+
+  private WALFlushListener log(WALEdit walEdit) {
+    buffer.write(walEdit);
+    return walEdit.getWalFlushListener();
+  }
+
+  @Override
+  public void onFlushStart(IMemTable memTable) {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;

Review comment:
       Added.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
+  private int currentLogVersion = 0;
+  /** current checkpoint file log writer, only updated by fsyncAndDeleteThread */
+  private ILogWriter currentLogWriter;
+  // endregion
+
+  public CheckpointManager(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new CheckpointWriter(
+            SystemFileFactory.INSTANCE.getFile(
+                logDirectory, CheckpointWriter.getLogFileName(currentLogVersion)));
+    makeGlobalInfoCP();
+    fsyncCheckpointFile();
+  }
+
+  /**
+   * make checkpoint for global memTables' info, this checkpoint only exists in the beginning of
+   * each checkpoint file
+   */
+  private void makeGlobalInfoCP() {
+    infoLock.lock();
+    try {
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new ArrayList<>(memTableId2Info.values()));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for create memTable info */
+  public void makeCreateMemTableCP(MemTableInfo memTableInfo) {
+    infoLock.lock();
+    try {
+      memTableId2Info.put(memTableInfo.getMemTableId(), memTableInfo);
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.CREATE_MEMORY_TABLE, Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  /** make checkpoint for flush memTable info */
+  public void makeFlushMemTableCP(int memTableId) {
+    infoLock.lock();
+    try {
+      MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
+      if (memTableInfo == null) {
+        return;
+      }
+      Checkpoint checkpoint =
+          new Checkpoint(
+              CheckpointType.FLUSH_MEMORY_TABLE, Collections.singletonList(memTableInfo));
+      logByCachedByteBuffer(checkpoint);
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private void logByCachedByteBuffer(Checkpoint checkpoint) {
+    // make sure cached ByteBuffer has enough capacity
+    int estimateSize = checkpoint.serializedSize();
+    if (cachedByteBuffer == null || estimateSize > cachedByteBuffer.capacity()) {
+      cachedByteBuffer = ByteBuffer.allocate(estimateSize);
+    }
+    checkpoint.serialize(cachedByteBuffer);
+    try {
+      currentLogWriter.write(cachedByteBuffer);
+    } catch (IOException e) {
+      logger.error("Fail to make checkpoint: {}", checkpoint, e);
+    } finally {
+      cachedByteBuffer.clear();
+    }
+  }
+
+  // region Task to fsync checkpoint file
+  /** Fsync checkpoints to the disk */
+  public void fsyncCheckpointFile() {
+    infoLock.lock();
+    try {
+      try {
+        currentLogWriter.force();
+      } catch (IOException e) {
+        logger.error(
+            "Fail to fsync wal node-{}'s checkpoint writer, change system mode to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+
+      try {
+        if (tryRollingLogWriter()) {
+          // first log global memTables' info, then delete old checkpoint file
+          makeGlobalInfoCP();
+          currentLogWriter.force();
+          File oldFile =
+              SystemFileFactory.INSTANCE.getFile(
+                  logDirectory, CheckpointWriter.getLogFileName(currentLogVersion - 1));
+          oldFile.delete();
+        }
+      } catch (IOException e) {
+        logger.error(
+            "Fail to roll wal node-{}'s checkpoint writer, change system mode to read-only.",
+            identifier,
+            e);
+        config.setReadOnly(true);
+      }
+    } finally {
+      infoLock.unlock();
+    }
+  }
+
+  private boolean tryRollingLogWriter() throws IOException {
+    if (currentLogWriter.size() < LOG_SIZE_LIMIT) {
+      return false;
+    }
+    currentLogWriter.close();
+    currentLogVersion++;
+    File nextLogFile =
+        SystemFileFactory.INSTANCE.getFile(
+            logDirectory, CheckpointWriter.getLogFileName(currentLogVersion));
+    currentLogWriter = new CheckpointWriter(nextLogFile);
+    return true;
+  }
+  // endregion
+
+  public MemTableInfo getOldestMemTableInfo() {
+    // find oldest memTable
+    List<MemTableInfo> memTableInfos;
+    infoLock.lock();
+    try {
+      memTableInfos = new ArrayList<>(memTableId2Info.values());
+    } finally {
+      infoLock.unlock();
+    }
+    if (memTableInfos.isEmpty()) {
+      return null;
+    }
+    MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
+    for (MemTableInfo memTableInfo : memTableInfos) {
+      if (oldestMemTableInfo.getFirstFileVersionId() > memTableInfo.getFirstFileVersionId()) {
+        oldestMemTableInfo = memTableInfo;
+      }
+    }
+    return oldestMemTableInfo;
+  }
+
+  /**
+   * Get version id of first valid .wal file
+   *
+   * @return Return {@link Integer#MIN_VALUE} if no file is valid
+   */
+  public int getFirstValidVersionId() {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage checkpoints of one wal node */
+public class CheckpointManager implements AutoCloseable {
+  /** use size limit to control WALEdit number in each file */
+  public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
+
+  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  /** WALNode identifier of this checkpoint manager */
+  protected final String identifier;
+  /** directory to store .checkpoint file */
+  protected final String logDirectory;
+  /**
+   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
+   * currentLogVersion and currentLogWriter
+   */
+  private final Lock infoLock = new ReentrantLock();
+  // region these variables should be protected by infoLock
+  /** memTable id -> memTable info */
+  private final Map<Integer, MemTableInfo> memTableId2Info = new HashMap<>();
+  /** cache the biggest byte buffer to serialize checkpoint */
+  private volatile ByteBuffer cachedByteBuffer;
+  /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
+  private int currentLogVersion = 0;

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  private static final long FSYNC_WAL_DELAY_IN_MS = config.getFsyncWalDelayInMs();
+  /** default delay time of each serialize task when wal mode is async */
+  public static final long ASYNC_WAL_DELAY_IN_MS = 100;
+  /** Maximum number of WALEdits in one serialize task when wal mode is sync */
+  public static final int SYNC_BATCH_SIZE_LIMIT = 100;

Review comment:
       Added.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -224,19 +222,20 @@ public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException {
       }
     }
 
-    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-      try {
-        getLogNode().write(insertRowPlan);
-      } catch (Exception e) {
-        if (enableMemControl && memIncrements != null) {
-          rollbackMemoryInfo(memIncrements);
-        }
-        throw new WriteProcessException(
-            String.format(
-                "%s: %s write WAL failed",
-                storageGroupName, tsFileResource.getTsFile().getAbsolutePath()),
-            e);
+    try {
+      WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowPlan);
+      if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+        throw walFlushListener.getCause();
+      }
+    } catch (Exception e) {
+      if (enableMemControl && memIncrements != null) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839351520



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/node/IWALNode.java
##########
@@ -16,35 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.wal.node;
 
-package org.apache.iotdb.db.writelog;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-
-import java.io.IOException;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
 
-public class WALFlushListener implements FlushListener {
+/** This interface provides uniform interface for writing wal and making checkpoints. */
+public interface IWALNode extends FlushListener, AutoCloseable {
+  /** Log InsertPlan */
+  WALFlushListener log(int memTableId, InsertPlan insertPlan);
 
-  private TsFileProcessor processor;
+  /** Log DeletePlan */
+  WALFlushListener log(int memTableId, DeletePlan deletePlan);

Review comment:
       It's unsuitable to use PhysicalPlan here, because only InsertPlan and DeletePlan can be serialized into wal module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839338199



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -504,4 +514,15 @@ public long getCreatedTime() {
   private IDeviceID getDeviceID(PartialPath deviceId) {
     return DeviceIDFactory.getInstance().getDeviceID(deviceId);
   }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    // TODO

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/WALSubmitter.java
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+/** Mark which classes will submit wal. */
+public interface WALSubmitter {}

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r840309062



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEditValue.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+
+/** A class implements this interface can be written into .wal file. */
+public interface WALEditValue extends SerializedSize {

Review comment:
       If we rename WALEditValue to WALEntryValue, should we also rename WALEdit to WALEntry? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r839351909



##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/SerializedSize.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+/** Implementations should calculate their accurate serialized size in bytes. */

Review comment:
       Moved to org.apache.iotdb.db.utils

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/utils/TsFilePathUtils.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+
+import java.io.File;
+
+public class TsFilePathUtils {

Review comment:
       Fixed.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/checkpoint/MemTableInfo.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.checkpoint;
+
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.wal.buffer.WALEdit;
+import org.apache.iotdb.db.wal.utils.SerializedSize;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * MemTableInfo records brief info of one memtable, including memTable id, tsFile path, and .wal
+ * file version id of its first {@link WALEdit}.
+ */
+public class MemTableInfo implements SerializedSize {
+  /** memTable id 4 bytes, tsFile path length 4 bytes, first version id 4 bytes */
+  private static final int FIXED_SERIALIZED_SIZE = Integer.BYTES * 2;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834060850



##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -139,46 +142,39 @@
 
   /** When inserting rejected exceeds this, throw an exception. Unit: millisecond */
   private int maxWaitingTimeWhenInsertBlockedInMs = 10000;
-  /** Is the write ahead log enable. */
-  private boolean enableWal = true;
 
-  private volatile boolean readOnly = false;
+  /** this variable set timestamp precision as millisecond, microsecond or nanosecond */
+  private String timestampPrecision = "ms";
 
-  private boolean enableDiscardOutOfOrderData = false;
+  // region Write Ahead Log Configuration

Review comment:
       this context? 

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -50,25 +50,45 @@ rpc_port=6667
 ### Write Ahead Log Configuration
 ####################
 
-# Is insert ahead log enable
-# Datatype: boolean
-# enable_wal=true
+# Write mode of wal
+# The details of these three modes are as follows:
+# 1. DISABLE: the system will disable wal.
+# 2. SYNC: the system will submit wal synchronously, write request will not return until its wal is fsynced to the disk successfully.
+# 3. ASYNC: the system will submit wal asynchronously, write request will return immediately no matter its wal is fsynced to the disk successfully.
+# The write performance order is DISABLE > ASYNC > SYNC, but only SYNC mode can ensure data durability.
+# wal_mode=SYNC
+
+# Duration a wal flush operation will wait before calling fsync
+# A duration greater than 0 batches multiple wal fsync calls into one. This is useful when disks are slow or WAL write contention exists.
+# Datatype: long
+# sync_wal_delay_in_ms=0
 
-# Add a switch to drop ouf-of-order data
-# Out-of-order data will impact the aggregation query a lot. Users may not care about discarding some out-of-order data.
-# Datatype: boolean
-# enable_discard_out_of_order_data=false
+# Max number of wal nodes, each node corresponds to one wal directory
+# The default value 0 means the concurrent wal number will be 2 * 'number of wal dirs'.
+# Datatype: int
+# max_wal_num=0
 
-# When a certain amount of insert ahead log is reached, it will be flushed to disk
-# It is possible to lose at most flush_wal_threshold operations
+# Buffer size of each wal node
+# If it sets a value smaller than 0, use the default value 16777216 bytes (16MB).
 # Datatype: int
-# flush_wal_threshold=10000
+# wal_buffer_size_in_byte=16777216
 
-# The cycle when insert ahead log is periodically forced to be written to disk(in milliseconds)
-# If force_wal_period_in_ms = 0 it means force insert ahead log to be written to disk after each refreshment
-# Set this parameter to 0 may slow down the ingestion on slow disk.
-# Datatype: long
-# force_wal_period_in_ms=100
+# Buffer entry size of each wal buffer
+# If it sets a value smaller than 0, use the default value 16384 bytes (16KB).
+# Datatype: int
+# wal_buffer_entry_size_in_byte=16384
+
+# Max storage space for wal
+# The default value 0 means the storage space will not be controlled.
+# Notice: If this parameter is too small, the write performance may decline.
+# Datatype: int
+# wal_storage_space_in_mb=0

Review comment:
       this default parameter is 0 ?  

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.io.WALWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AbstractWALBuffer implements IWALBuffer {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractWALBuffer.class);
+  /** use size limit to control WALEdit number in each file */
+  protected static final long LOG_SIZE_LIMIT = 10 * 1024 * 1024;
+
+  /** WALNode identifier of this buffer */
+  protected final String identifier;
+  /** directory to store .wal files */
+  protected final String logDirectory;
+  /** current wal file version id */
+  protected final AtomicInteger currentLogVersion = new AtomicInteger();
+  /** current wal file log writer */
+  protected volatile ILogWriter currentLogWriter;
+
+  public AbstractWALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    this.identifier = identifier;
+    this.logDirectory = logDirectory;
+    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
+    if (!logDirFile.exists() && logDirFile.mkdirs()) {
+      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
+    }
+    currentLogWriter =
+        new WALWriter(

Review comment:
       super class of WALWriter is AbstractWALBuffer,  why new WALWriter in AbstractWALBuffer ?

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  /** Maximum number of WALEdits in one serialize task */
+  public static final int BATCH_SIZE_LIMIT = 100;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long SYNC_WAL_DELAY_IN_MS = config.getSyncWalDelayInMs();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(BATCH_SIZE_LIMIT * 10);
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);

Review comment:
       Can the number of ByteBuffer.allocateDirect be controlled? If the number of ByteBuffer.allocateDirect is infinite, the memory will be used up?

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -496,6 +501,11 @@ void updatePlanIndexes(long index) {
     minPlanIndex = Math.min(index, minPlanIndex);
   }
 
+  @Override
+  public int getMemTableId() {
+    return memTableId;

Review comment:
       memTableId  is a fixed value,  memTableIdCounter.getAndIncrement() == 0.

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALMode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** This class is used to manage all wal nodes */
+public class WALManager implements IService {
+  public static final long FSYNC_CHECKPOINT_FILE_DELAY_IN_MS = 200;
+  public static final long DELETE_WAL_FILES_DELAY_IN_MS = 10 * 60 * 1000;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final int MAX_WAL_NUM =
+      config.getMaxWalNum() > 0 ? config.getMaxWalNum() : config.getWalDirs().length * 2;
+
+  /** manage wal folders */
+  private FolderManager folderManager;
+  /** protect concurrent safety of walNodes, nodeCursor and nodeIdCounter */
+  private final Lock nodesLock = new ReentrantLock();
+  /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+  private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NUM);
+  /** help allocate node for users */
+  private int nodeCursor = -1;
+  /** each wal node has a unique long value identifier */
+  private long nodeIdCounter = -1;

Review comment:
       nodeIdCounter  should use AtomicLong.  

##########
File path: server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.buffer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This buffer guarantees the concurrent safety and uses double buffers mechanism to accelerate
+ * writes and avoid waiting for buffer syncing to disk.
+ */
+public class WALBuffer extends AbstractWALBuffer {
+  /** Maximum number of WALEdits in one serialize task */
+  public static final int BATCH_SIZE_LIMIT = 100;
+
+  private static final Logger logger = LoggerFactory.getLogger(WALBuffer.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long SYNC_WAL_DELAY_IN_MS = config.getSyncWalDelayInMs();
+  private static final int WAL_BUFFER_SIZE = config.getWalBufferSize();
+  /** notify serializeThread to stop */
+  private static final WALEdit CLOSE_SIGNAL = new WALEdit(-1, new DeletePlan());
+
+  /** whether close method is called */
+  private volatile boolean isClosed = false;
+  /** WALEdits */
+  private final BlockingQueue<WALEdit> walEdits = new ArrayBlockingQueue<>(BATCH_SIZE_LIMIT * 10);
+  /** two buffers switch between three statuses (there is always 1 buffer working) */
+  // buffer in working status, only updated by serializeThread
+  private volatile ByteBuffer workingBuffer;
+  // buffer in idle status
+  private volatile ByteBuffer idleBuffer;
+  // buffer in syncing status, serializeThread makes sure no more writes to syncingBuffer
+  private volatile ByteBuffer syncingBuffer;
+  /** lock to provide synchronization for double buffers mechanism, protecting buffers status */
+  private final Lock buffersLock = new ReentrantLock();
+  /** condition to guarantee correctness of switching buffers */
+  private final Condition idleBufferReadyCondition = buffersLock.newCondition();
+  /** single thread to serialize WALEdit to workingBuffer */
+  private final ExecutorService serializeThread;
+  /** single thread to sync syncingBuffer to disk */
+  private final ExecutorService syncBufferThread;
+
+  public WALBuffer(String identifier, String logDirectory) throws FileNotFoundException {
+    super(identifier, logDirectory);
+    allocateBuffers();
+    serializeThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SERIALIZE.getName() + "(node-" + identifier + ")");
+    syncBufferThread =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(
+            ThreadName.WAL_SYNC.getName() + "(node-" + identifier + ")");
+    // start receiving serialize tasks
+    serializeThread.submit(new SerializeTask());
+  }
+
+  private void allocateBuffers() {
+    try {
+      workingBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+      idleBuffer = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE / 2);
+    } catch (OutOfMemoryError e) {
+      logger.error("Fail to allocate wal node-{}'s buffer because out of memory.", identifier, e);
+      close();
+      throw e;
+    }
+  }
+
+  @Override
+  public void write(WALEdit edit) {
+    if (isClosed) {
+      logger.error(
+          "Fail to write WALEdit into wal node-{} because this node is closed.", identifier);
+      edit.getWalFlushListener().fail(new WALNodeClosedException(identifier));
+      return;
+    }
+    // only add this WALEdit to queue
+    try {
+      walEdits.put(edit);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted when waiting for adding WalEdit to buffer.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  // region Task of serializeThread
+  /** This task serializes WALEdit to workingBuffer and will call fsync at last. */
+  private class SerializeTask implements Runnable {
+    private final IWALByteBufferView byteBufferVew = new ByteBufferView();
+    private final List<WALFlushListener> fsyncListeners = new LinkedList<>();
+
+    @Override
+    public void run() {
+      try {
+        serialize();
+      } finally {
+        serializeThread.submit(new SerializeTask());
+      }
+    }
+
+    /** In order to control memory usage of blocking queue, get 1 and then serialize 1 */
+    private void serialize() {
+      // for better fsync performance, sleep a while to enlarge write batch
+      if (SYNC_WAL_DELAY_IN_MS > 0) {
+        try {
+          Thread.sleep(SYNC_WAL_DELAY_IN_MS);
+        } catch (InterruptedException e) {
+          logger.warn("Interrupted when sleeping a while to enlarge wal write batch.");
+          Thread.currentThread().interrupt();
+        }
+      }
+      // try to get first WALEdit with blocking interface
+      int batchSize = 0;
+      try {
+        WALEdit edit = walEdits.take();
+        try {
+          if (edit != CLOSE_SIGNAL) {
+            edit.serialize(byteBufferVew);
+            ++batchSize;
+            fsyncListeners.add(edit.getWalFlushListener());
+          }
+        } catch (Exception e) {
+          logger.error(
+              "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+          edit.getWalFlushListener().fail(e);
+        }
+      } catch (InterruptedException e) {
+        logger.warn(
+            "Interrupted when waiting for taking WALEdit from blocking queue to serialize.");
+        Thread.currentThread().interrupt();
+      }
+      // try to get more WALEdits with non-blocking interface to enlarge write batch
+      while (walEdits.peek() != null && batchSize < BATCH_SIZE_LIMIT) {
+        WALEdit edit = walEdits.poll();
+        if (edit == null || edit == CLOSE_SIGNAL) {
+          break;
+        } else {
+          try {
+            edit.serialize(byteBufferVew);
+          } catch (Exception e) {
+            logger.error(
+                "Fail to serialize WALEdit to wal node-{}'s buffer, discard it.", identifier, e);
+            edit.getWalFlushListener().fail(e);
+            continue;
+          }
+          ++batchSize;
+          fsyncListeners.add(edit.getWalFlushListener());
+        }
+      }
+      // call fsync at last and set fsyncListeners
+      if (batchSize > 0) {
+        fsyncWorkingBuffer(fsyncListeners);
+      }
+    }
+  }
+
+  /**
+   * This view uses workingBuffer lock-freely because workingBuffer is only updated by
+   * serializeThread and this class is only used by serializeThread.
+   */
+  private class ByteBufferView implements IWALByteBufferView {
+    private void ensureEnoughSpace(int bytesNum) {
+      if (workingBuffer.remaining() < bytesNum) {
+        rollBuffer();
+      }
+    }
+
+    private void rollBuffer() {
+      syncWorkingBuffer();
+    }
+
+    @Override
+    public void put(byte b) {
+      ensureEnoughSpace(Byte.BYTES);
+      workingBuffer.put(b);
+    }
+
+    @Override
+    public void put(byte[] src) {
+      int offset = 0;
+      while (true) {
+        int leftCapacity = workingBuffer.remaining();
+        int needCapacity = src.length - offset;
+        if (leftCapacity >= needCapacity) {
+          workingBuffer.put(src, offset, needCapacity);
+          break;
+        } else {
+          workingBuffer.put(src, offset, leftCapacity);
+          offset += leftCapacity;
+          rollBuffer();
+        }
+      }
+    }
+
+    @Override
+    public void putChar(char value) {
+      ensureEnoughSpace(Character.BYTES);
+      workingBuffer.putChar(value);
+    }
+
+    @Override
+    public void putShort(short value) {
+      ensureEnoughSpace(Short.BYTES);
+      workingBuffer.putShort(value);
+    }
+
+    @Override
+    public void putInt(int value) {
+      ensureEnoughSpace(Integer.BYTES);
+      workingBuffer.putInt(value);
+    }
+
+    @Override
+    public void putLong(long value) {
+      ensureEnoughSpace(Long.BYTES);
+      workingBuffer.putLong(value);
+    }
+
+    @Override
+    public void putFloat(float value) {
+      ensureEnoughSpace(Float.BYTES);
+      workingBuffer.putFloat(value);
+    }
+
+    @Override
+    public void putDouble(double value) {
+      ensureEnoughSpace(Double.BYTES);
+      workingBuffer.putDouble(value);
+    }
+  }
+
+  /** Notice: this method only called when buffer is exhausted by SerializeTask. */
+  private void syncWorkingBuffer() {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(false));
+  }
+
+  /** Notice: this method only called at the last of SerializeTask. */
+  private void fsyncWorkingBuffer(List<WALFlushListener> fsyncListeners) {
+    switchIdleBufferToWorking();
+    syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners));
+  }
+
+  // only called by serializeThread
+  private void switchIdleBufferToWorking() {
+    buffersLock.lock();
+    try {
+      while (idleBuffer == null) {
+        idleBufferReadyCondition.await();
+      }
+      syncingBuffer = workingBuffer;
+      workingBuffer = idleBuffer;
+      workingBuffer.clear();
+      idleBuffer = null;
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted When waiting for available working buffer.");
+      Thread.currentThread().interrupt();
+    } finally {
+      buffersLock.unlock();
+    }
+  }
+  // endregion

Review comment:
       delete 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r836340603



##########
File path: server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
##########
@@ -33,7 +34,7 @@
 import java.util.List;
 import java.util.Set;
 
-public abstract class InsertPlan extends PhysicalPlan {
+public abstract class InsertPlan extends PhysicalPlan implements WALEditValue {

Review comment:
       Use WALEditValue to manage InsertPlan, DeletePlan and IMemTable, you can see more details in the WALEdit class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] HeimingZ commented on a change in pull request #5320: [IOTDB-1614] New WAL

Posted by GitBox <gi...@apache.org>.
HeimingZ commented on a change in pull request #5320:
URL: https://github.com/apache/iotdb/pull/5320#discussion_r834164957



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
##########
@@ -496,6 +501,11 @@ void updatePlanIndexes(long index) {
     minPlanIndex = Math.min(index, minPlanIndex);
   }
 
+  @Override
+  public int getMemTableId() {
+    return memTableId;

Review comment:
       Each memtable has its own id, this value is initialized at [AbstractMemTable L87-L88](https://github.com/apache/iotdb/pull/5320/files#diff-9b46aad29da3fa0719055cf485e20ed8c654b8faa799fc03c32ccd9df63f8509R87-R88)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org