You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/17 12:25:21 UTC
[iotdb] branch master updated: Optimize ml WAL limitation for MultiLeader (#7012)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d8ac4a33b6 Optimize ml WAL limitation for MultiLeader (#7012)
d8ac4a33b6 is described below
commit d8ac4a33b661d77da1167d7b23925ec00aa9de19
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Aug 17 20:25:15 2022 +0800
Optimize ml WAL limitation for MultiLeader (#7012)
---
.../confignode1conf/iotdb-confignode.properties | 1 +
.../iotdb/consensus/config/MultiLeaderConfig.java | 32 +++++------------
.../multileader/MultiLeaderServerImpl.java | 40 +++++++++++++++++-----
.../multileader/logdispatcher/LogDispatcher.java | 6 ++--
.../resources/conf/iotdb-datanode.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++--
.../db/consensus/DataRegionConsensusImpl.java | 2 +-
8 files changed, 53 insertions(+), 46 deletions(-)
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index 85572adee2..17f3017d90 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -1,3 +1,4 @@
+
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 14157aea78..1c12022dc5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -202,8 +202,7 @@ public class MultiLeaderConfig {
private final int maxWaitingTimeForAccumulatingBatchInMs;
private final long basicRetryWaitTimeMs;
private final long maxRetryWaitTimeMs;
- private final long throttleDownThreshold;
- private final long throttleUpThreshold;
+ private final long walThrottleThreshold;
private final long throttleTimeOutMs;
private Replication(
@@ -213,8 +212,7 @@ public class MultiLeaderConfig {
int maxWaitingTimeForAccumulatingBatchInMs,
long basicRetryWaitTimeMs,
long maxRetryWaitTimeMs,
- long throttleDownThreshold,
- long throttleUpThreshold,
+ long walThrottleThreshold,
long throttleTimeOutMs) {
this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
this.maxRequestPerBatch = maxRequestPerBatch;
@@ -222,8 +220,7 @@ public class MultiLeaderConfig {
this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
- this.throttleDownThreshold = throttleDownThreshold;
- this.throttleUpThreshold = throttleUpThreshold;
+ this.walThrottleThreshold = walThrottleThreshold;
this.throttleTimeOutMs = throttleTimeOutMs;
}
@@ -251,12 +248,8 @@ public class MultiLeaderConfig {
return maxRetryWaitTimeMs;
}
- public long getThrottleDownThreshold() {
- return throttleDownThreshold;
- }
-
- public long getThrottleUpThreshold() {
- return throttleUpThreshold;
+ public long getWalThrottleThreshold() {
+ return walThrottleThreshold;
}
public long getThrottleTimeOutMs() {
@@ -276,8 +269,7 @@ public class MultiLeaderConfig {
private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
- private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
- private long throttleUpThreshold = 1024 * 1024 * 1024L;
+ private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
public Replication.Builder setMaxPendingRequestNumPerNode(int maxPendingRequestNumPerNode) {
@@ -311,13 +303,8 @@ public class MultiLeaderConfig {
return this;
}
- public Replication.Builder setThrottleDownThreshold(long throttleDownThreshold) {
- this.throttleDownThreshold = throttleDownThreshold;
- return this;
- }
-
- public Replication.Builder setThrottleUpThreshold(long throttleUpThreshold) {
- this.throttleUpThreshold = throttleUpThreshold;
+ public Replication.Builder setWalThrottleThreshold(long walThrottleThreshold) {
+ this.walThrottleThreshold = walThrottleThreshold;
return this;
}
@@ -334,8 +321,7 @@ public class MultiLeaderConfig {
maxWaitingTimeForAccumulatingBatchInMs,
basicRetryWaitTimeMs,
maxRetryWaitTimeMs,
- throttleDownThreshold,
- throttleUpThreshold,
+ walThrottleThreshold,
throttleTimeOutMs);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index b106bca0c6..619e9fb32d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -46,7 +47,11 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class MultiLeaderServerImpl {
@@ -56,6 +61,8 @@ public class MultiLeaderServerImpl {
private final Peer thisNode;
private final IStateMachine stateMachine;
+ private final Lock stateMachineLock = new ReentrantLock();
+ private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
private final List<Peer> configuration;
private final AtomicLong index;
@@ -108,14 +115,20 @@ public class MultiLeaderServerImpl {
* records the index of the log and writes locally, and then asynchronous replication is performed
*/
public TSStatus write(IConsensusRequest request) {
- synchronized (stateMachine) {
- if (needToThrottleDown()) {
+ stateMachineLock.lock();
+ try {
+ if (needBlockWrite()) {
logger.info(
"[Throttle Down] index:{}, safeIndex:{}",
getIndex(),
getCurrentSafelyDeletedSearchIndex());
try {
- stateMachine.wait(config.getReplication().getThrottleTimeOutMs());
+ boolean timeout =
+ !stateMachineCondition.await(
+ config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS);
+ if (timeout) {
+ return RpcUtils.getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR);
+ }
} catch (InterruptedException e) {
logger.error("Failed to throttle down because ", e);
Thread.currentThread().interrupt();
@@ -151,6 +164,8 @@ public class MultiLeaderServerImpl {
result.getCode());
}
return result;
+ } finally {
+ stateMachineLock.unlock();
}
}
@@ -237,14 +252,21 @@ public class MultiLeaderServerImpl {
return config;
}
- public boolean needToThrottleDown() {
- return reader.getTotalSize() > config.getReplication().getThrottleDownThreshold();
+ public boolean needBlockWrite() {
+ return reader.getTotalSize() > config.getReplication().getWalThrottleThreshold();
+ }
+
+ public boolean unblockWrite() {
+ return reader.getTotalSize() < config.getReplication().getWalThrottleThreshold();
}
- public boolean needToThrottleUp() {
- return reader.getTotalSize()
- < config.getReplication().getThrottleDownThreshold()
- - config.getReplication().getThrottleUpThreshold();
+ public void signal() {
+ stateMachineLock.lock();
+ try {
+ stateMachineCondition.signalAll();
+ } finally {
+ stateMachineLock.unlock();
+ }
}
public AtomicLong getIndexObject() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 7ee776acda..e3539752be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -223,10 +223,8 @@ public class LogDispatcher {
// safely
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
// notify
- if (impl.needToThrottleUp()) {
- synchronized (impl.getStateMachine()) {
- impl.getStateMachine().notifyAll();
- }
+ if (impl.unblockWrite()) {
+ impl.signal();
}
}
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index e24b1d9ed8..85dc0ce5ff 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -181,7 +181,7 @@ target_config_nodes=127.0.0.1:22277
# The minimum size of wal files when throttle down in MultiLeader consensus
# If it's a value smaller than 0, use the default value 50 * 1024 * 1024 * 1024 bytes (50GB).
# Datatype: long
-# multi_leader_throttle_down_threshold_in_byte=53687091200
+# multi_leader_throttle_threshold_in_byte=53687091200
####################
### Directory Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 39bbe10647..416a2a300e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -953,7 +953,7 @@ public class IoTDBConfig {
private int driverTaskExecutionTimeSliceInMs = 100;
/** Maximum size of wal buffer used in MultiLeader consensus. Unit: byte */
- private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
+ private long throttleThreshold = 50 * 1024 * 1024 * 1024L;
IoTDBConfig() {}
@@ -3059,12 +3059,12 @@ public class IoTDBConfig {
this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
}
- public long getThrottleDownThreshold() {
- return throttleDownThreshold;
+ public long getThrottleThreshold() {
+ return throttleThreshold;
}
- public void setThrottleDownThreshold(long throttleDownThreshold) {
- this.throttleDownThreshold = throttleDownThreshold;
+ public void setThrottleThreshold(long throttleThreshold) {
+ this.throttleThreshold = throttleThreshold;
}
public String getConfigMessage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dc0db70e5d..3d9e81624f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1078,10 +1078,10 @@ public class IoTDBDescriptor {
long throttleDownThresholdInByte =
Long.parseLong(
properties.getProperty(
- "multi_leader_throttle_down_threshold_in_byte",
- Long.toString(conf.getThrottleDownThreshold())));
+ "multi_leader_throttle_threshold_in_byte",
+ Long.toString(conf.getThrottleThreshold())));
if (throttleDownThresholdInByte > 0) {
- conf.setThrottleDownThreshold(throttleDownThresholdInByte);
+ conf.setThrottleThreshold(throttleDownThresholdInByte);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index c642f3b2e1..f2e9aac920 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -79,7 +79,7 @@ public class DataRegionConsensusImpl {
.build())
.setReplication(
MultiLeaderConfig.Replication.newBuilder()
- .setThrottleDownThreshold(conf.getThrottleDownThreshold())
+ .setWalThrottleThreshold(conf.getThrottleThreshold())
.build())
.build())
.setRatisConfig(