You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/17 12:25:21 UTC

[iotdb] branch master updated: Optimize ml WAL limitation for MultiLeader (#7012)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ac4a33b6 Optimize ml WAL limitation for MultiLeader (#7012)
d8ac4a33b6 is described below

commit d8ac4a33b661d77da1167d7b23925ec00aa9de19
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Wed Aug 17 20:25:15 2022 +0800

    Optimize ml WAL limitation for MultiLeader (#7012)
---
 .../confignode1conf/iotdb-confignode.properties    |  1 +
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 32 +++++------------
 .../multileader/MultiLeaderServerImpl.java         | 40 +++++++++++++++++-----
 .../multileader/logdispatcher/LogDispatcher.java   |  6 ++--
 .../resources/conf/iotdb-datanode.properties       |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++--
 .../db/consensus/DataRegionConsensusImpl.java      |  2 +-
 8 files changed, 53 insertions(+), 46 deletions(-)

diff --git a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index 85572adee2..17f3017d90 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -1,3 +1,4 @@
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 14157aea78..1c12022dc5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -202,8 +202,7 @@ public class MultiLeaderConfig {
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
     private final long maxRetryWaitTimeMs;
-    private final long throttleDownThreshold;
-    private final long throttleUpThreshold;
+    private final long walThrottleThreshold;
     private final long throttleTimeOutMs;
 
     private Replication(
@@ -213,8 +212,7 @@ public class MultiLeaderConfig {
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
         long maxRetryWaitTimeMs,
-        long throttleDownThreshold,
-        long throttleUpThreshold,
+        long walThrottleThreshold,
         long throttleTimeOutMs) {
       this.maxPendingRequestNumPerNode = maxPendingRequestNumPerNode;
       this.maxRequestPerBatch = maxRequestPerBatch;
@@ -222,8 +220,7 @@ public class MultiLeaderConfig {
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
       this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
-      this.throttleDownThreshold = throttleDownThreshold;
-      this.throttleUpThreshold = throttleUpThreshold;
+      this.walThrottleThreshold = walThrottleThreshold;
       this.throttleTimeOutMs = throttleTimeOutMs;
     }
 
@@ -251,12 +248,8 @@ public class MultiLeaderConfig {
       return maxRetryWaitTimeMs;
     }
 
-    public long getThrottleDownThreshold() {
-      return throttleDownThreshold;
-    }
-
-    public long getThrottleUpThreshold() {
-      return throttleUpThreshold;
+    public long getWalThrottleThreshold() {
+      return walThrottleThreshold;
     }
 
     public long getThrottleTimeOutMs() {
@@ -276,8 +269,7 @@ public class MultiLeaderConfig {
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
-      private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
-      private long throttleUpThreshold = 1024 * 1024 * 1024L;
+      private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
       private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
 
       public Replication.Builder setMaxPendingRequestNumPerNode(int maxPendingRequestNumPerNode) {
@@ -311,13 +303,8 @@ public class MultiLeaderConfig {
         return this;
       }
 
-      public Replication.Builder setThrottleDownThreshold(long throttleDownThreshold) {
-        this.throttleDownThreshold = throttleDownThreshold;
-        return this;
-      }
-
-      public Replication.Builder setThrottleUpThreshold(long throttleUpThreshold) {
-        this.throttleUpThreshold = throttleUpThreshold;
+      public Replication.Builder setWalThrottleThreshold(long walThrottleThreshold) {
+        this.walThrottleThreshold = walThrottleThreshold;
         return this;
       }
 
@@ -334,8 +321,7 @@ public class MultiLeaderConfig {
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
             maxRetryWaitTimeMs,
-            throttleDownThreshold,
-            throttleUpThreshold,
+            walThrottleThreshold,
             throttleTimeOutMs);
       }
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index b106bca0c6..619e9fb32d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClie
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
@@ -46,7 +47,11 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class MultiLeaderServerImpl {
 
@@ -56,6 +61,8 @@ public class MultiLeaderServerImpl {
 
   private final Peer thisNode;
   private final IStateMachine stateMachine;
+  private final Lock stateMachineLock = new ReentrantLock();
+  private final Condition stateMachineCondition = stateMachineLock.newCondition();
   private final String storageDir;
   private final List<Peer> configuration;
   private final AtomicLong index;
@@ -108,14 +115,20 @@ public class MultiLeaderServerImpl {
    * records the index of the log and writes locally, and then asynchronous replication is performed
    */
   public TSStatus write(IConsensusRequest request) {
-    synchronized (stateMachine) {
-      if (needToThrottleDown()) {
+    stateMachineLock.lock();
+    try {
+      if (needBlockWrite()) {
         logger.info(
             "[Throttle Down] index:{}, safeIndex:{}",
             getIndex(),
             getCurrentSafelyDeletedSearchIndex());
         try {
-          stateMachine.wait(config.getReplication().getThrottleTimeOutMs());
+          boolean timeout =
+              !stateMachineCondition.await(
+                  config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS);
+          if (timeout) {
+            return RpcUtils.getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR);
+          }
         } catch (InterruptedException e) {
           logger.error("Failed to throttle down because ", e);
           Thread.currentThread().interrupt();
@@ -151,6 +164,8 @@ public class MultiLeaderServerImpl {
             result.getCode());
       }
       return result;
+    } finally {
+      stateMachineLock.unlock();
     }
   }
 
@@ -237,14 +252,21 @@ public class MultiLeaderServerImpl {
     return config;
   }
 
-  public boolean needToThrottleDown() {
-    return reader.getTotalSize() > config.getReplication().getThrottleDownThreshold();
+  public boolean needBlockWrite() {
+    return reader.getTotalSize() > config.getReplication().getWalThrottleThreshold();
+  }
+
+  public boolean unblockWrite() {
+    return reader.getTotalSize() < config.getReplication().getWalThrottleThreshold();
   }
 
-  public boolean needToThrottleUp() {
-    return reader.getTotalSize()
-        < config.getReplication().getThrottleDownThreshold()
-            - config.getReplication().getThrottleUpThreshold();
+  public void signal() {
+    stateMachineLock.lock();
+    try {
+      stateMachineCondition.signalAll();
+    } finally {
+      stateMachineLock.unlock();
+    }
   }
 
   public AtomicLong getIndexObject() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 7ee776acda..e3539752be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -223,10 +223,8 @@ public class LogDispatcher {
       // safely
       reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
       // notify
-      if (impl.needToThrottleUp()) {
-        synchronized (impl.getStateMachine()) {
-          impl.getStateMachine().notifyAll();
-        }
+      if (impl.unblockWrite()) {
+        impl.signal();
       }
     }
 
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index e24b1d9ed8..85dc0ce5ff 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -181,7 +181,7 @@ target_config_nodes=127.0.0.1:22277
 # The minimum size of wal files when throttle down in MultiLeader consensus
 # If it's a value smaller than 0, use the default value 50 * 1024 * 1024 * 1024 bytes (50GB).
 # Datatype: long
-# multi_leader_throttle_down_threshold_in_byte=53687091200
+# multi_leader_throttle_threshold_in_byte=53687091200
 
 ####################
 ### Directory Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 39bbe10647..416a2a300e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -953,7 +953,7 @@ public class IoTDBConfig {
   private int driverTaskExecutionTimeSliceInMs = 100;
 
   /** Maximum size of wal buffer used in MultiLeader consensus. Unit: byte */
-  private long throttleDownThreshold = 50 * 1024 * 1024 * 1024L;
+  private long throttleThreshold = 50 * 1024 * 1024 * 1024L;
 
   IoTDBConfig() {}
 
@@ -3059,12 +3059,12 @@ public class IoTDBConfig {
     this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs;
   }
 
-  public long getThrottleDownThreshold() {
-    return throttleDownThreshold;
+  public long getThrottleThreshold() {
+    return throttleThreshold;
   }
 
-  public void setThrottleDownThreshold(long throttleDownThreshold) {
-    this.throttleDownThreshold = throttleDownThreshold;
+  public void setThrottleThreshold(long throttleThreshold) {
+    this.throttleThreshold = throttleThreshold;
   }
 
   public String getConfigMessage() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index dc0db70e5d..3d9e81624f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1078,10 +1078,10 @@ public class IoTDBDescriptor {
     long throttleDownThresholdInByte =
         Long.parseLong(
             properties.getProperty(
-                "multi_leader_throttle_down_threshold_in_byte",
-                Long.toString(conf.getThrottleDownThreshold())));
+                "multi_leader_throttle_threshold_in_byte",
+                Long.toString(conf.getThrottleThreshold())));
     if (throttleDownThresholdInByte > 0) {
-      conf.setThrottleDownThreshold(throttleDownThresholdInByte);
+      conf.setThrottleThreshold(throttleDownThresholdInByte);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index c642f3b2e1..f2e9aac920 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -79,7 +79,7 @@ public class DataRegionConsensusImpl {
                                       .build())
                               .setReplication(
                                   MultiLeaderConfig.Replication.newBuilder()
-                                      .setThrottleDownThreshold(conf.getThrottleDownThreshold())
+                                      .setWalThrottleThreshold(conf.getThrottleThreshold())
                                       .build())
                               .build())
                       .setRatisConfig(