You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/16 01:52:01 UTC

[iotdb] branch native_raft updated: change log serialization

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 646b482153 change log serialization
646b482153 is described below

commit 646b4821534c815f5b72f17d4ecf6bc5c9e6cf86
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Mar 16 09:53:47 2023 +0800

    change log serialization
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  5 +-
 .../iotdb/consensus/config/ConsensusConfig.java    |  2 +-
 .../iotdb/consensus/natraft/protocol/PeerInfo.java | 27 ++-----
 .../consensus/natraft/protocol/RaftConfig.java     | 90 ++++++++++------------
 .../consensus/natraft/protocol/RaftMember.java     |  2 +-
 .../protocol/log/applier/AsyncLogApplier.java      |  2 +-
 .../protocol/log/dispatch/LogDispatcher.java       |  1 -
 .../protocol/log/manager/RaftLogManager.java       | 39 ++++------
 .../log/manager/serialization/LogManagerMeta.java  | 15 ----
 .../manager/serialization/StableEntryManager.java  |  3 +-
 .../serialization/SyncLogDequeSerializer.java      |  6 +-
 .../iotdb/consensus/natraft/utils/Timer.java       |  2 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 +-
 .../execution/executor/RegionWriteExecutor.java    | 10 ++-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  4 +-
 16 files changed, 91 insertions(+), 124 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index d850139696..28057779e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.confignode.conf;
 
-import java.util.Properties;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -30,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
+import java.util.Properties;
 
 public class ConfigNodeConfig {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e01bb7ebf5..d427b1f7c6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -193,7 +193,10 @@ public class ConfigNodeProcedureEnv {
 
   public boolean verifySucceed(TSStatus... status) {
     return Arrays.stream(status)
-        .allMatch(tsStatus -> tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode());
+        .allMatch(
+            tsStatus ->
+                tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+                    || tsStatus.getCode() == TSStatusCode.WEAKLY_ACCEPTED.getStatusCode());
   }
 
   public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index 2c561db6a2..fed43984fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.consensus.config;
 
-import java.util.Properties;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 
 import java.util.Optional;
+import java.util.Properties;
 
 public class ConsensusConfig {
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
index c82a0722f6..b7b53cc79a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
@@ -20,35 +20,24 @@
 package org.apache.iotdb.consensus.natraft.protocol;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class PeerInfo {
-  private long nextIndex;
-  private long matchIndex;
+  private AtomicLong matchIndex;
   private AtomicInteger inconsistentHeartbeatNum = new AtomicInteger();
   // lastLogIndex from the last heartbeat
   private long lastHeartBeatIndex;
 
-  public PeerInfo(long nextIndex) {
-    this.nextIndex = nextIndex;
-    this.matchIndex = -1;
+  public PeerInfo() {
+    this.matchIndex = new AtomicLong(-1);
   }
 
-  public synchronized long getNextIndex() {
-    return nextIndex;
+  public long getMatchIndex() {
+    return matchIndex.get();
   }
 
-  public synchronized void setNextIndex(long nextIndex) {
-    this.nextIndex = nextIndex;
-  }
-
-  public synchronized long getMatchIndex() {
-    return matchIndex;
-  }
-
-  public synchronized void setMatchIndex(long matchIndex) {
-    this.matchIndex = matchIndex;
-    this.setNextIndex(Math.max(nextIndex, matchIndex + 1));
-    this.notifyAll();
+  public void setMatchIndex(long matchIndex) {
+    this.matchIndex.set(matchIndex);
   }
 
   public int incInconsistentHeartbeatNum() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 560e89d303..91ca23a15f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -1,38 +1,41 @@
 /*
 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
-
-
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+
+
+*/
 
 package org.apache.iotdb.consensus.natraft.protocol;
 
-import java.util.Properties;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RPCConfig;
 import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
 
 public class RaftConfig {
 
+  private static final Logger logger = LoggerFactory.getLogger(RaftConfig.class);
   private boolean enableWeakAcceptance = false;
   private int maxNumOfLogsInMem = 10000;
   private int minNumOfLogsInMem = 1000;
@@ -44,7 +47,6 @@ public class RaftConfig {
   private int uncommittedRaftLogNumForRejectThreshold = 10000;
   private int heartbeatIntervalMs = 1000;
   private int electionTimeoutMs = 20_000;
-  private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
   private boolean enableUsePersistLogOnDiskToCatchUp;
   private long writeOperationTimeoutMS = 20_000L;
   private int logNumInBatch = 100;
@@ -171,17 +173,12 @@ public class RaftConfig {
     return rpcConfig.getRpcMaxConcurrentClientNum();
   }
 
-
   public int getMaxIdleClientPerNode() {
     return rpcConfig.getRpcMinConcurrentClientNum();
   }
 
   public int getConnectionTimeoutInMS() {
-    return connectionTimeoutInMS;
-  }
-
-  public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
-    this.connectionTimeoutInMS = connectionTimeoutInMS;
+    return rpcConfig.getConnectionTimeoutInMs();
   }
 
   public boolean isEnableUsePersistLogOnDiskToCatchUp() {
@@ -385,13 +382,8 @@ public class RaftConfig {
     this.dispatchingCompressionType = dispatchingCompressionType;
   }
 
-
   public void loadProperties(Properties properties) {
-
-    this.setConnectionTimeoutInMS(
-        Integer.parseInt(
-            properties.getProperty(
-                "connection_timeout_ms", String.valueOf(this.getConnectionTimeoutInMS()))));
+    logger.debug("Loading properties: {}", properties);
 
     this.setHeartbeatIntervalMs(
         Integer.parseInt(
@@ -416,13 +408,12 @@ public class RaftConfig {
     this.setWriteOperationTimeoutMS(
         Integer.parseInt(
             properties.getProperty(
-                "write_operation_timeout_ms",
-                String.valueOf(this.getWriteOperationTimeoutMS()))));
+                "write_operation_timeout_ms", String.valueOf(this.getWriteOperationTimeoutMS()))));
 
-    this.setSyncLeaderMaxWaitMs(Integer.parseInt(
-        properties.getProperty(
-            "sync_leader_max_wait",
-            String.valueOf(this.getSyncLeaderMaxWaitMs()))));
+    this.setSyncLeaderMaxWaitMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "sync_leader_max_wait", String.valueOf(this.getSyncLeaderMaxWaitMs()))));
 
     this.setMinNumOfLogsInMem(
         Integer.parseInt(
@@ -442,7 +433,8 @@ public class RaftConfig {
     this.setMaxWaitingTimeWhenInsertBlocked(
         Long.parseLong(
             properties.getProperty(
-                "max_insert_block_time_ms", String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
+                "max_insert_block_time_ms",
+                String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
 
     this.setLogDeleteCheckIntervalSecond(
         Integer.parseInt(
@@ -468,8 +460,7 @@ public class RaftConfig {
 
     this.setLogNumInBatch(
         Integer.parseInt(
-            properties.getProperty(
-                "log_batch_num", String.valueOf(this.getLogNumInBatch()))));
+            properties.getProperty("log_batch_num", String.valueOf(this.getLogNumInBatch()))));
 
     this.setMaxRaftLogIndexSizeInMemory(
         Integer.parseInt(
@@ -520,8 +511,7 @@ public class RaftConfig {
     this.setUseFollowerSlidingWindow(
         Boolean.parseBoolean(
             properties.getProperty(
-                "use_follower_sliding_window",
-                String.valueOf(this.isUseFollowerSlidingWindow()))));
+                "use_follower_sliding_window", String.valueOf(this.isUseFollowerSlidingWindow()))));
 
     this.setEnableWeakAcceptance(
         Boolean.parseBoolean(
@@ -566,12 +556,14 @@ public class RaftConfig {
     this.setEnableCompressedDispatching(
         Boolean.parseBoolean(
             properties.getProperty(
-                "use_compressed_dispatching", String.valueOf(this.isEnableCompressedDispatching()))));
+                "use_compressed_dispatching",
+                String.valueOf(this.isEnableCompressedDispatching()))));
 
     this.setDispatchingCompressionType(
-        CompressionType.valueOf(CompressionType.class, properties.getProperty(
-            "default_boolean_encoding", this.getDispatchingCompressionType().toString()))
-    );
+        CompressionType.valueOf(
+            CompressionType.class,
+            properties.getProperty(
+                "default_boolean_encoding", this.getDispatchingCompressionType().toString())));
 
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8d96d73179..bbbc5a11c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -359,7 +359,7 @@ public class RaftMember {
   public void initPeerMap() {
     status.peerMap = new ConcurrentHashMap<>();
     for (Peer peer : allNodes) {
-      status.peerMap.computeIfAbsent(peer, k -> new PeerInfo(logManager.getLastLogIndex()));
+      status.peerMap.computeIfAbsent(peer, k -> new PeerInfo());
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 14745a510d..4378e5b5f4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -38,7 +38,7 @@ import java.util.function.Consumer;
 public class AsyncLogApplier implements LogApplier {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncLogApplier.class);
-  private static final int CONCURRENT_CONSUMER_NUM = 4;
+  private static final int CONCURRENT_CONSUMER_NUM = 16;
   private LogApplier embeddedApplier;
   private DataLogConsumer[] consumers;
   private ExecutorService consumerPool;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index bf1c9768fa..5b5995064c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -450,5 +450,4 @@ public class LogDispatcher {
       }
     }
   }
-
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 804328305d..2c3ffb488e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -402,6 +402,13 @@ public abstract class RaftLogManager {
       }
       entries.addAll(appendingEntries);
     }
+    if (config.isEnableRaftLogPersistence()) {
+      try {
+        getStableEntryManager().append(appendingEntries, commitIndex, appliedIndex);
+      } catch (IOException e) {
+        logger.error("Cannot persist entries", e);
+      }
+    }
 
     Object logUpdateCondition =
         getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
@@ -574,31 +581,17 @@ public abstract class RaftLogManager {
     }
   }
 
-  private void commitEntries(List<Entry> entries) throws LogExecutionException {
-    try {
-      // Operations here are so simple that the execution could be thought
-      // success or fail together approximately.
-      Entry lastLog = entries.get(entries.size() - 1);
-      commitIndex = lastLog.getCurrLogIndex();
+  private void commitEntries(List<Entry> entries) {
+    // Operations here are so simple that the execution could be thought
+    // success or fail together approximately.
+    Entry lastLog = entries.get(entries.size() - 1);
+    commitIndex = lastLog.getCurrLogIndex();
 
-      if (config.isEnableRaftLogPersistence()) {
-        // Cluster could continue provide service when exception is thrown here
-        getStableEntryManager().append(entries, appliedIndex);
-      }
-      for (Entry entry : entries) {
-        if (entry.createTime != 0) {
-          entry.committedTime = System.nanoTime();
-          Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(
-              entry.committedTime - entry.createTime);
-        }
+    for (Entry entry : entries) {
+      if (entry.createTime != 0) {
+        entry.committedTime = System.nanoTime();
+        Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
       }
-    } catch (IOException e) {
-      // The exception will block the raft service continue accept log.
-      // TODO: Notify user that the persisted logs before these entries(include) are corrupted.
-      // TODO: An idea is that we can degrade the service by disable raft log persistent for
-      // TODO: the group. It needs fine-grained control for the config of Raft log persistence.
-      logger.error("{}: persistent raft log error:", name, e);
-      throw new LogExecutionException(e);
     }
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
index d342bd1a54..bf5751c2c2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
 
 public class LogManagerMeta {
 
-  private long commitLogTerm = -1;
   private long commitLogIndex = -1;
   private long lastLogIndex = -1;
   private long lastLogTerm = -1;
@@ -36,7 +35,6 @@ public class LogManagerMeta {
 
   public static LogManagerMeta deserialize(ByteBuffer buffer) {
     LogManagerMeta res = new LogManagerMeta();
-    res.commitLogTerm = ReadWriteIOUtils.readLong(buffer);
     res.commitLogIndex = ReadWriteIOUtils.readLong(buffer);
     res.lastLogIndex = ReadWriteIOUtils.readLong(buffer);
     res.lastLogTerm = ReadWriteIOUtils.readLong(buffer);
@@ -57,7 +55,6 @@ public class LogManagerMeta {
   public ByteBuffer serialize() {
     // 5 is the number of attributes in class LogManagerMeta
     ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 6);
-    byteBuffer.putLong(commitLogTerm);
     byteBuffer.putLong(commitLogIndex);
     byteBuffer.putLong(lastLogIndex);
     byteBuffer.putLong(lastLogTerm);
@@ -71,8 +68,6 @@ public class LogManagerMeta {
   @Override
   public String toString() {
     return "LogManagerMeta{"
-        + " commitLogTerm="
-        + commitLogTerm
         + ", commitLogIndex="
         + commitLogIndex
         + ", lastLogIndex="
@@ -100,10 +95,6 @@ public class LogManagerMeta {
     this.lastLogTerm = lastLogTerm;
   }
 
-  public void setCommitLogTerm(long commitLogTerm) {
-    this.commitLogTerm = commitLogTerm;
-  }
-
   public long getLastAppliedIndex() {
     return lastAppliedIndex;
   }
@@ -116,10 +107,6 @@ public class LogManagerMeta {
     return lastAppliedTerm;
   }
 
-  public void setLastAppliedTerm(long lastAppliedTerm) {
-    this.lastAppliedTerm = lastAppliedTerm;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -136,7 +123,6 @@ public class LogManagerMeta {
         .append(commitLogIndex, that.commitLogIndex)
         .append(lastLogIndex, that.lastLogIndex)
         .append(lastLogTerm, that.lastLogTerm)
-        .append(commitLogTerm, that.commitLogTerm)
         .append(lastAppliedIndex, that.lastAppliedIndex)
         .append(lastAppliedTerm, that.lastAppliedTerm)
         .isEquals();
@@ -148,7 +134,6 @@ public class LogManagerMeta {
         .append(commitLogIndex)
         .append(lastLogIndex)
         .append(lastLogTerm)
-        .append(commitLogTerm)
         .append(lastAppliedIndex)
         .toHashCode();
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index 130ea8866f..b1768e5a24 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -31,7 +31,8 @@ public interface StableEntryManager {
 
   List<Entry> getAllEntriesAfterCommittedIndex();
 
-  void append(List<Entry> entries, long maxHaveAppliedCommitIndex) throws IOException;
+  void append(List<Entry> entries, long commitIndex, long maxHaveAppliedCommitIndex)
+      throws IOException;
 
   void flushLogBuffer();
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 4df1d4faf2..d330f4421e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -250,13 +250,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   @Override
-  public void append(List<Entry> entries, long maxHaveAppliedCommitIndex) throws IOException {
+  public void append(List<Entry> entries, long commitIndex, long maxHaveAppliedCommitIndex)
+      throws IOException {
     lock.lock();
     try {
       putLogs(entries);
       Entry entry = entries.get(entries.size() - 1);
-      meta.setCommitLogIndex(entry.getCurrLogIndex());
-      meta.setCommitLogTerm(entry.getCurrLogTerm());
+      meta.setCommitLogIndex(commitIndex);
       meta.setLastLogIndex(entry.getCurrLogIndex());
       meta.setLastLogTerm(entry.getCurrLogTerm());
       meta.setLastAppliedIndex(maxHaveAppliedCommitIndex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index df654f5bfd..619bde93d6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -292,7 +292,7 @@ public class Timer {
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
         LOG_DISPATCHER,
-        "from create to OK",
+        "from create to applied",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6b176c26fd..147fb6b4e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -84,7 +84,7 @@ public class IoTDBDescriptor {
       logger.info("Will reload properties from {} ", loader.getClass().getName());
       Properties properties = loader.loadProperties();
       loadProperties(properties);
-      conf.setCustomizedProperties(loader.getCustomizedProperties());
+      conf.getCustomizedProperties().putAll(loader.getCustomizedProperties());
       TSFileDescriptor.getInstance().overwriteConfigByCustomSettings(properties);
       TSFileDescriptor.getInstance()
           .getConfig()
@@ -985,6 +985,7 @@ public class IoTDBDescriptor {
     conf.setTimePartitionInterval(
         DateTimeUtils.convertMilliTimeWithPrecision(
             conf.getTimePartitionInterval(), conf.getTimestampPrecision()));
+    conf.getCustomizedProperties().putAll(properties);
   }
 
   private void loadAuthorCache(Properties properties) {
@@ -1454,8 +1455,6 @@ public class IoTDBDescriptor {
       if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
         WALManager.getInstance().rebootWALDeleteThread();
       }
-
-      conf.getCustomizedProperties().putAll(properties);
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 8aae71cb30..7e7f02b081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -150,7 +150,9 @@ public class RegionWriteExecutor {
       // TODO need consider more status
       if (writeResponse.getStatus() != null) {
         response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()
+                || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+                    == writeResponse.getStatus().getCode());
         response.setMessage(writeResponse.getStatus().message);
         response.setStatus(writeResponse.getStatus());
       } else {
@@ -244,8 +246,10 @@ public class RegionWriteExecutor {
         if (writeResponse.getStatus() != null) {
           response.setAccepted(
               !hasFailedMeasurement
-                  && TSStatusCode.SUCCESS_STATUS.getStatusCode()
-                      == writeResponse.getStatus().getCode());
+                  && (TSStatusCode.SUCCESS_STATUS.getStatusCode()
+                          == writeResponse.getStatus().getCode()
+                      || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+                          == writeResponse.getStatus().getCode()));
           if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
             response.setMessage(writeResponse.getStatus().message);
             response.setStatus(writeResponse.getStatus());
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1c2d64e092..b90f75f41a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -92,7 +92,8 @@ public class RpcUtils {
     if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
       return;
     }
-    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.code != TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()) {
       throw new StatementExecutionException(status);
     }
   }
@@ -141,6 +142,7 @@ public class RpcUtils {
         new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": ");
     for (TSStatus status : statuses) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && status.getCode() != TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
           && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         errMsgs.append(status.getMessage()).append("; ");
       }