You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/05/06 09:09:21 UTC

[iotdb] branch jira5841 created (now d9c54a26b64)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5841
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d9c54a26b64 improve

This branch includes the following new commits:

     new d9c54a26b64 improve

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: improve

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5841
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d9c54a26b646294b959d8cdd42e671914cea49ad
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Sat May 6 17:08:52 2023 +0800

    improve
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../iotdb/consensus/config/IoTConsensusConfig.java | 36 ++++++++++------------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  8 ++---
 .../logdispatcher/LogDispatcherThreadMetrics.java  | 22 +++++++++++++
 .../consensus/iot/logdispatcher/SyncStatus.java    |  2 --
 4 files changed, 41 insertions(+), 27 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index ff2d2d48641..a4300d68b5a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -253,8 +253,9 @@ public class IoTConsensusConfig {
     private final int maxLogEntriesNumPerBatch;
     private final int maxSizePerBatch;
     private final int maxPendingBatchesNum;
+
+    private final int maxQueueLength;
     private final long maxWaitingTimeForWaitBatchInMs;
-    private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
     private final long maxRetryWaitTimeMs;
     private final long walThrottleThreshold;
@@ -267,8 +268,8 @@ public class IoTConsensusConfig {
         int maxLogEntriesNumPerBatch,
         int maxSizePerBatch,
         int maxPendingBatchesNum,
+        int maxQueueLength,
         long maxWaitingTimeForWaitBatchInMs,
-        int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
         long maxRetryWaitTimeMs,
         long walThrottleThreshold,
@@ -279,8 +280,8 @@ public class IoTConsensusConfig {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
+      this.maxQueueLength = maxQueueLength;
       this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
-      this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
       this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
       this.walThrottleThreshold = walThrottleThreshold;
@@ -302,12 +303,12 @@ public class IoTConsensusConfig {
       return maxPendingBatchesNum;
     }
 
-    public long getMaxWaitingTimeForWaitBatchInMs() {
-      return maxWaitingTimeForWaitBatchInMs;
+    public int getMaxQueueLength() {
+      return maxQueueLength;
     }
 
-    public int getMaxWaitingTimeForAccumulatingBatchInMs() {
-      return maxWaitingTimeForAccumulatingBatchInMs;
+    public long getMaxWaitingTimeForWaitBatchInMs() {
+      return maxWaitingTimeForWaitBatchInMs;
     }
 
     public long getBasicRetryWaitTimeMs() {
@@ -344,13 +345,11 @@ public class IoTConsensusConfig {
 
     public static class Builder {
 
-      private int maxLogEntriesNumPerBatch = 30;
+      private int maxLogEntriesNumPerBatch = 1024;
       private int maxSizePerBatch = 16 * 1024 * 1024;
-      // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
-      // in DataRegionStateMachine
-      private int maxPendingBatchesNum = 5;
+      private int maxPendingBatchesNum = 12;
+      private int maxQueueLength = 4096;
       private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
-      private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -374,15 +373,14 @@ public class IoTConsensusConfig {
         return this;
       }
 
-      public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
-          long maxWaitingTimeForWaitBatchInMs) {
-        this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
+      public Builder setMaxQueueLength(int maxQueueLength) {
+        this.maxQueueLength = maxQueueLength;
         return this;
       }
 
-      public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
-          int maxWaitingTimeForAccumulatingBatchInMs) {
-        this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
+      public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
+          long maxWaitingTimeForWaitBatchInMs) {
+        this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
         return this;
       }
 
@@ -426,8 +424,8 @@ public class IoTConsensusConfig {
             maxLogEntriesNumPerBatch,
             maxSizePerBatch,
             maxPendingBatchesNum,
+            maxQueueLength,
             maxWaitingTimeForWaitBatchInMs,
-            maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
             maxRetryWaitTimeMs,
             walThrottleThreshold,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52a..cd70efe655b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -214,7 +214,7 @@ public class LogDispatcher {
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
       this.peer = peer;
       this.config = config;
-      this.pendingEntries = new LinkedBlockingQueue<>();
+      this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
       this.controller =
           new IndexController(
               impl.getStorageDir(),
@@ -314,10 +314,6 @@ public class LogDispatcher {
                 pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
             if (request != null) {
               bufferedEntries.add(request);
-              // If write pressure is low, we simply sleep a little to reduce the number of RPC
-              if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch()) {
-                Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
-              }
             }
           }
           MetricService.getInstance()
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
index 2ca5b91275d..24d0960dc0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
@@ -48,6 +48,18 @@ public class LogDispatcherThreadMetrics implements IMetricSet {
             logDispatcherThread.getPeer().getGroupId().toString(),
             Tag.TYPE.toString(),
             "currentSyncIndex");
+    MetricService.getInstance()
+        .createAutoGauge(
+            Metric.IOT_CONSENSUS.toString(),
+            MetricLevel.IMPORTANT,
+            logDispatcherThread,
+            x -> x.getSyncStatus().getPendingBatches().size(),
+            Tag.NAME.toString(),
+            formatName(),
+            Tag.REGION.toString(),
+            logDispatcherThread.getPeer().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "pipelineNum");
     MetricService.getInstance()
         .createAutoGauge(
             Metric.IOT_CONSENSUS.toString(),
@@ -74,6 +86,16 @@ public class LogDispatcherThreadMetrics implements IMetricSet {
             logDispatcherThread.getPeer().getGroupId().toString(),
             Tag.TYPE.toString(),
             "currentSyncIndex");
+    MetricService.getInstance()
+        .remove(
+            MetricType.AUTO_GAUGE,
+            Metric.IOT_CONSENSUS.toString(),
+            Tag.NAME.toString(),
+            formatName(),
+            Tag.REGION.toString(),
+            logDispatcherThread.getPeer().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "pipelineNum");
     MetricService.getInstance()
         .remove(
             MetricType.AUTO_GAUGE,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index de6d0691dc4..8e1f40db8f0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
 import java.util.Iterator;
@@ -102,7 +101,6 @@ public class SyncStatus {
     }
   }
 
-  @TestOnly
   public List<Batch> getPendingBatches() {
     return pendingBatches;
   }