You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/16 01:52:01 UTC
[iotdb] branch native_raft updated: change log serialization
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 646b482153 change log serialization
646b482153 is described below
commit 646b4821534c815f5b72f17d4ecf6bc5c9e6cf86
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Mar 16 09:53:47 2023 +0800
change log serialization
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 5 +-
.../iotdb/consensus/config/ConsensusConfig.java | 2 +-
.../iotdb/consensus/natraft/protocol/PeerInfo.java | 27 ++-----
.../consensus/natraft/protocol/RaftConfig.java | 90 ++++++++++------------
.../consensus/natraft/protocol/RaftMember.java | 2 +-
.../protocol/log/applier/AsyncLogApplier.java | 2 +-
.../protocol/log/dispatch/LogDispatcher.java | 1 -
.../protocol/log/manager/RaftLogManager.java | 39 ++++------
.../log/manager/serialization/LogManagerMeta.java | 15 ----
.../manager/serialization/StableEntryManager.java | 3 +-
.../serialization/SyncLogDequeSerializer.java | 6 +-
.../iotdb/consensus/natraft/utils/Timer.java | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +-
.../execution/executor/RegionWriteExecutor.java | 10 ++-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 4 +-
16 files changed, 91 insertions(+), 124 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index d850139696..28057779e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.conf;
-import java.util.Properties;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -30,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
+import java.util.Properties;
public class ConfigNodeConfig {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index e01bb7ebf5..d427b1f7c6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -193,7 +193,10 @@ public class ConfigNodeProcedureEnv {
public boolean verifySucceed(TSStatus... status) {
return Arrays.stream(status)
- .allMatch(tsStatus -> tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ .allMatch(
+ tsStatus ->
+ tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || tsStatus.getCode() == TSStatusCode.WEAKLY_ACCEPTED.getStatusCode());
}
public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index 2c561db6a2..fed43984fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.consensus.config;
-import java.util.Properties;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import java.util.Optional;
+import java.util.Properties;
public class ConsensusConfig {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
index c82a0722f6..b7b53cc79a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/PeerInfo.java
@@ -20,35 +20,24 @@
package org.apache.iotdb.consensus.natraft.protocol;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class PeerInfo {
- private long nextIndex;
- private long matchIndex;
+ private AtomicLong matchIndex;
private AtomicInteger inconsistentHeartbeatNum = new AtomicInteger();
// lastLogIndex from the last heartbeat
private long lastHeartBeatIndex;
- public PeerInfo(long nextIndex) {
- this.nextIndex = nextIndex;
- this.matchIndex = -1;
+ public PeerInfo() {
+ this.matchIndex = new AtomicLong(-1);
}
- public synchronized long getNextIndex() {
- return nextIndex;
+ public long getMatchIndex() {
+ return matchIndex.get();
}
- public synchronized void setNextIndex(long nextIndex) {
- this.nextIndex = nextIndex;
- }
-
- public synchronized long getMatchIndex() {
- return matchIndex;
- }
-
- public synchronized void setMatchIndex(long matchIndex) {
- this.matchIndex = matchIndex;
- this.setNextIndex(Math.max(nextIndex, matchIndex + 1));
- this.notifyAll();
+ public void setMatchIndex(long matchIndex) {
+ this.matchIndex.set(matchIndex);
}
public int incInconsistentHeartbeatNum() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 560e89d303..91ca23a15f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -1,38 +1,41 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
-
-
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+
+
+*/
package org.apache.iotdb.consensus.natraft.protocol;
-import java.util.Properties;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RPCConfig;
import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.File;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
public class RaftConfig {
+ private static final Logger logger = LoggerFactory.getLogger(RaftConfig.class);
private boolean enableWeakAcceptance = false;
private int maxNumOfLogsInMem = 10000;
private int minNumOfLogsInMem = 1000;
@@ -44,7 +47,6 @@ public class RaftConfig {
private int uncommittedRaftLogNumForRejectThreshold = 10000;
private int heartbeatIntervalMs = 1000;
private int electionTimeoutMs = 20_000;
- private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
private boolean enableUsePersistLogOnDiskToCatchUp;
private long writeOperationTimeoutMS = 20_000L;
private int logNumInBatch = 100;
@@ -171,17 +173,12 @@ public class RaftConfig {
return rpcConfig.getRpcMaxConcurrentClientNum();
}
-
public int getMaxIdleClientPerNode() {
return rpcConfig.getRpcMinConcurrentClientNum();
}
public int getConnectionTimeoutInMS() {
- return connectionTimeoutInMS;
- }
-
- public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
- this.connectionTimeoutInMS = connectionTimeoutInMS;
+ return rpcConfig.getConnectionTimeoutInMs();
}
public boolean isEnableUsePersistLogOnDiskToCatchUp() {
@@ -385,13 +382,8 @@ public class RaftConfig {
this.dispatchingCompressionType = dispatchingCompressionType;
}
-
public void loadProperties(Properties properties) {
-
- this.setConnectionTimeoutInMS(
- Integer.parseInt(
- properties.getProperty(
- "connection_timeout_ms", String.valueOf(this.getConnectionTimeoutInMS()))));
+ logger.debug("Loading properties: {}", properties);
this.setHeartbeatIntervalMs(
Integer.parseInt(
@@ -416,13 +408,12 @@ public class RaftConfig {
this.setWriteOperationTimeoutMS(
Integer.parseInt(
properties.getProperty(
- "write_operation_timeout_ms",
- String.valueOf(this.getWriteOperationTimeoutMS()))));
+ "write_operation_timeout_ms", String.valueOf(this.getWriteOperationTimeoutMS()))));
- this.setSyncLeaderMaxWaitMs(Integer.parseInt(
- properties.getProperty(
- "sync_leader_max_wait",
- String.valueOf(this.getSyncLeaderMaxWaitMs()))));
+ this.setSyncLeaderMaxWaitMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "sync_leader_max_wait", String.valueOf(this.getSyncLeaderMaxWaitMs()))));
this.setMinNumOfLogsInMem(
Integer.parseInt(
@@ -442,7 +433,8 @@ public class RaftConfig {
this.setMaxWaitingTimeWhenInsertBlocked(
Long.parseLong(
properties.getProperty(
- "max_insert_block_time_ms", String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
+ "max_insert_block_time_ms",
+ String.valueOf(this.getMaxWaitingTimeWhenInsertBlocked()))));
this.setLogDeleteCheckIntervalSecond(
Integer.parseInt(
@@ -468,8 +460,7 @@ public class RaftConfig {
this.setLogNumInBatch(
Integer.parseInt(
- properties.getProperty(
- "log_batch_num", String.valueOf(this.getLogNumInBatch()))));
+ properties.getProperty("log_batch_num", String.valueOf(this.getLogNumInBatch()))));
this.setMaxRaftLogIndexSizeInMemory(
Integer.parseInt(
@@ -520,8 +511,7 @@ public class RaftConfig {
this.setUseFollowerSlidingWindow(
Boolean.parseBoolean(
properties.getProperty(
- "use_follower_sliding_window",
- String.valueOf(this.isUseFollowerSlidingWindow()))));
+ "use_follower_sliding_window", String.valueOf(this.isUseFollowerSlidingWindow()))));
this.setEnableWeakAcceptance(
Boolean.parseBoolean(
@@ -566,12 +556,14 @@ public class RaftConfig {
this.setEnableCompressedDispatching(
Boolean.parseBoolean(
properties.getProperty(
- "use_compressed_dispatching", String.valueOf(this.isEnableCompressedDispatching()))));
+ "use_compressed_dispatching",
+ String.valueOf(this.isEnableCompressedDispatching()))));
this.setDispatchingCompressionType(
- CompressionType.valueOf(CompressionType.class, properties.getProperty(
- "default_boolean_encoding", this.getDispatchingCompressionType().toString()))
- );
+ CompressionType.valueOf(
+ CompressionType.class,
+ properties.getProperty(
+ "default_boolean_encoding", this.getDispatchingCompressionType().toString())));
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8d96d73179..bbbc5a11c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -359,7 +359,7 @@ public class RaftMember {
public void initPeerMap() {
status.peerMap = new ConcurrentHashMap<>();
for (Peer peer : allNodes) {
- status.peerMap.computeIfAbsent(peer, k -> new PeerInfo(logManager.getLastLogIndex()));
+ status.peerMap.computeIfAbsent(peer, k -> new PeerInfo());
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 14745a510d..4378e5b5f4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -38,7 +38,7 @@ import java.util.function.Consumer;
public class AsyncLogApplier implements LogApplier {
private static final Logger logger = LoggerFactory.getLogger(AsyncLogApplier.class);
- private static final int CONCURRENT_CONSUMER_NUM = 4;
+ private static final int CONCURRENT_CONSUMER_NUM = 16;
private LogApplier embeddedApplier;
private DataLogConsumer[] consumers;
private ExecutorService consumerPool;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index bf1c9768fa..5b5995064c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -450,5 +450,4 @@ public class LogDispatcher {
}
}
}
-
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 804328305d..2c3ffb488e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -402,6 +402,13 @@ public abstract class RaftLogManager {
}
entries.addAll(appendingEntries);
}
+ if (config.isEnableRaftLogPersistence()) {
+ try {
+ getStableEntryManager().append(appendingEntries, commitIndex, appliedIndex);
+ } catch (IOException e) {
+ logger.error("Cannot persist entries", e);
+ }
+ }
Object logUpdateCondition =
getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
@@ -574,31 +581,17 @@ public abstract class RaftLogManager {
}
}
- private void commitEntries(List<Entry> entries) throws LogExecutionException {
- try {
- // Operations here are so simple that the execution could be thought
- // success or fail together approximately.
- Entry lastLog = entries.get(entries.size() - 1);
- commitIndex = lastLog.getCurrLogIndex();
+ private void commitEntries(List<Entry> entries) {
+ // Operations here are so simple that the execution could be thought
+ // success or fail together approximately.
+ Entry lastLog = entries.get(entries.size() - 1);
+ commitIndex = lastLog.getCurrLogIndex();
- if (config.isEnableRaftLogPersistence()) {
- // Cluster could continue provide service when exception is thrown here
- getStableEntryManager().append(entries, appliedIndex);
- }
- for (Entry entry : entries) {
- if (entry.createTime != 0) {
- entry.committedTime = System.nanoTime();
- Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(
- entry.committedTime - entry.createTime);
- }
+ for (Entry entry : entries) {
+ if (entry.createTime != 0) {
+ entry.committedTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
}
- } catch (IOException e) {
- // The exception will block the raft service continue accept log.
- // TODO: Notify user that the persisted logs before these entries(include) are corrupted.
- // TODO: An idea is that we can degrade the service by disable raft log persistent for
- // TODO: the group. It needs fine-grained control for the config of Raft log persistence.
- logger.error("{}: persistent raft log error:", name, e);
- throw new LogExecutionException(e);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
index d342bd1a54..bf5751c2c2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
public class LogManagerMeta {
- private long commitLogTerm = -1;
private long commitLogIndex = -1;
private long lastLogIndex = -1;
private long lastLogTerm = -1;
@@ -36,7 +35,6 @@ public class LogManagerMeta {
public static LogManagerMeta deserialize(ByteBuffer buffer) {
LogManagerMeta res = new LogManagerMeta();
- res.commitLogTerm = ReadWriteIOUtils.readLong(buffer);
res.commitLogIndex = ReadWriteIOUtils.readLong(buffer);
res.lastLogIndex = ReadWriteIOUtils.readLong(buffer);
res.lastLogTerm = ReadWriteIOUtils.readLong(buffer);
@@ -57,7 +55,6 @@ public class LogManagerMeta {
public ByteBuffer serialize() {
// 5 is the number of attributes in class LogManagerMeta
ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES * 6);
- byteBuffer.putLong(commitLogTerm);
byteBuffer.putLong(commitLogIndex);
byteBuffer.putLong(lastLogIndex);
byteBuffer.putLong(lastLogTerm);
@@ -71,8 +68,6 @@ public class LogManagerMeta {
@Override
public String toString() {
return "LogManagerMeta{"
- + " commitLogTerm="
- + commitLogTerm
+ ", commitLogIndex="
+ commitLogIndex
+ ", lastLogIndex="
@@ -100,10 +95,6 @@ public class LogManagerMeta {
this.lastLogTerm = lastLogTerm;
}
- public void setCommitLogTerm(long commitLogTerm) {
- this.commitLogTerm = commitLogTerm;
- }
-
public long getLastAppliedIndex() {
return lastAppliedIndex;
}
@@ -116,10 +107,6 @@ public class LogManagerMeta {
return lastAppliedTerm;
}
- public void setLastAppliedTerm(long lastAppliedTerm) {
- this.lastAppliedTerm = lastAppliedTerm;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -136,7 +123,6 @@ public class LogManagerMeta {
.append(commitLogIndex, that.commitLogIndex)
.append(lastLogIndex, that.lastLogIndex)
.append(lastLogTerm, that.lastLogTerm)
- .append(commitLogTerm, that.commitLogTerm)
.append(lastAppliedIndex, that.lastAppliedIndex)
.append(lastAppliedTerm, that.lastAppliedTerm)
.isEquals();
@@ -148,7 +134,6 @@ public class LogManagerMeta {
.append(commitLogIndex)
.append(lastLogIndex)
.append(lastLogTerm)
- .append(commitLogTerm)
.append(lastAppliedIndex)
.toHashCode();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index 130ea8866f..b1768e5a24 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -31,7 +31,8 @@ public interface StableEntryManager {
List<Entry> getAllEntriesAfterCommittedIndex();
- void append(List<Entry> entries, long maxHaveAppliedCommitIndex) throws IOException;
+ void append(List<Entry> entries, long commitIndex, long maxHaveAppliedCommitIndex)
+ throws IOException;
void flushLogBuffer();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 4df1d4faf2..d330f4421e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -250,13 +250,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
}
@Override
- public void append(List<Entry> entries, long maxHaveAppliedCommitIndex) throws IOException {
+ public void append(List<Entry> entries, long commitIndex, long maxHaveAppliedCommitIndex)
+ throws IOException {
lock.lock();
try {
putLogs(entries);
Entry entry = entries.get(entries.size() - 1);
- meta.setCommitLogIndex(entry.getCurrLogIndex());
- meta.setCommitLogTerm(entry.getCurrLogTerm());
+ meta.setCommitLogIndex(commitIndex);
meta.setLastLogIndex(entry.getCurrLogIndex());
meta.setLastLogTerm(entry.getCurrLogTerm());
meta.setLastAppliedIndex(maxHaveAppliedCommitIndex);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index df654f5bfd..619bde93d6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -292,7 +292,7 @@ public class Timer {
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
LOG_DISPATCHER,
- "from create to OK",
+ "from create to applied",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6b176c26fd..147fb6b4e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -84,7 +84,7 @@ public class IoTDBDescriptor {
logger.info("Will reload properties from {} ", loader.getClass().getName());
Properties properties = loader.loadProperties();
loadProperties(properties);
- conf.setCustomizedProperties(loader.getCustomizedProperties());
+ conf.getCustomizedProperties().putAll(loader.getCustomizedProperties());
TSFileDescriptor.getInstance().overwriteConfigByCustomSettings(properties);
TSFileDescriptor.getInstance()
.getConfig()
@@ -985,6 +985,7 @@ public class IoTDBDescriptor {
conf.setTimePartitionInterval(
DateTimeUtils.convertMilliTimeWithPrecision(
conf.getTimePartitionInterval(), conf.getTimestampPrecision()));
+ conf.getCustomizedProperties().putAll(properties);
}
private void loadAuthorCache(Properties properties) {
@@ -1454,8 +1455,6 @@ public class IoTDBDescriptor {
if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
WALManager.getInstance().rebootWALDeleteThread();
}
-
- conf.getCustomizedProperties().putAll(properties);
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 8aae71cb30..7e7f02b081 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -150,7 +150,9 @@ public class RegionWriteExecutor {
// TODO need consider more status
if (writeResponse.getStatus() != null) {
response.setAccepted(
- TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
+ TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode()
+ || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+ == writeResponse.getStatus().getCode());
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());
} else {
@@ -244,8 +246,10 @@ public class RegionWriteExecutor {
if (writeResponse.getStatus() != null) {
response.setAccepted(
!hasFailedMeasurement
- && TSStatusCode.SUCCESS_STATUS.getStatusCode()
- == writeResponse.getStatus().getCode());
+ && (TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ == writeResponse.getStatus().getCode()
+ || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+ == writeResponse.getStatus().getCode()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
response.setMessage(writeResponse.getStatus().message);
response.setStatus(writeResponse.getStatus());
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 1c2d64e092..b90f75f41a 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -92,7 +92,8 @@ public class RpcUtils {
if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return;
}
- if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.code != TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()) {
throw new StatementExecutionException(status);
}
}
@@ -141,6 +142,7 @@ public class RpcUtils {
new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": ");
for (TSStatus status : statuses) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() != TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
errMsgs.append(status.getMessage()).append("; ");
}