You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/05/05 06:23:47 UTC

[iotdb] branch 55_wal_accumulation updated (5baa4b524e6 -> 92940d62e5f)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch 55_wal_accumulation
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 5baa4b524e6 optimize iotconsensus
     add 197890bba81 [To rel/1.1][IOTDB-5719] Move DataNode's checking of SchemaQuota above the consensus layer (#9626)
     add 21338fe7741 [To rel/1.1][IOTDB-5776]Update memory estimation of cross space compaction (#9628)
     add 8c31ae2b8ad [To rel/1.1][IOTDB-5691] The first level-0 tsfile too large to trigger "Unsequence InnerSpaceComapction" and "CrossSpaceCompaction" (#9504)
     add b50469b39aa [To rel/1.1] Fix potential Npe of ShuffleSinkHandle
     add a8e4e158646 [To rel/1.1][IOTDB-5798] Fix bug caused by acknowledgeTsBlock
     add e68572aec11 [IOTDB-5762][Metric] add do nothing level(#9615) (#9632)
     add 5e98877b5d6 [To rel/1.1] Add a cache to avoid initialize duplicated device id object in write process (#9647)
     add 037a9bdcb20 [To rel/1.1][IOTDB-5786] Fix potential deadlock in DriverScheduler
     add 177578964c3 [IOTDB-5784] Incorrect result when querying with offset push-down and time filter
     add 818f8e37385 [To rel/1.1] remove check of closed state in ISink
     add fb1d4266306 [To rel/1.1] Fix concurrent state change bug in QueryStateMachine
     add dedc9087eb8 [To rel/1.1]Remove unecessary synchronized on decrementCnt of SinkListener
     add c144e84589b [To rel/1.1][IOTDB-5820] Exception when use PIPE if sender has more than one data dir on different disks (#9699)
     add 4e5f0977cf6 [To rel/1.1][IOTDB-5183] Use default snapshot installation path for confignode & schema region
     add 2624c5a3660 [To rel/1.1][IOTDB-5807]Fix decompression error for aligned series in fast compaction
     add 521d33b27df [To rel/1.1][IOTDB-5735] Fix result of some UDF with align by device is incorrect
     add 8b980b1a7cf [IOTDB-5812] Reduce useless create of PartialPath in auth module (#9691) (#9704)
     add 1096fe45908 [IOTDB-5721][To rel/1.1] Streaming query DataPartition and Schema while loading TsFile (#9697)
     add b7789c66d39 [To rel/1.1] Correct ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL name
     add 313aa143a6d [To rel/1.1][IOTDB-5815] Fix Npe when UDF spilling data to disk
     add b5901698ffa [To rel/1.1] [IOTDB-5803] Improve query performance by reducing cpu consuming
     add f60cf9f35f9 [To rel/1.1][IOTDB-5825]Fix error in aligned empty value chunk in fast compaction (#9720)
     add 02a9b00d50d Change default multi_dir_strategy to SequenceStrategy and fix original bug (#9718) (#9726)
     add 35fc9c68c92 [To rel/1.1] Sort the result of AlignedUpdateLastCacheOperator for LastQueryMergeOperator
     add a8a836c49e2 [To rel/1.1] Forbid the query executor thread interrupted by other threads
     add f909d676354 [To rel/1.1][IOTDB-5835] Fix wal accumulation caused by datanode restart
     add 34754f1e578 [To rel/1.1][IOTDB-5832] Fix Bug: The size of readyQueue is negative incorrectly
     add 7b091099e47 [To rel/1.1] [IOTDB-5828] Optimize the performance of some parts in metrics, and correcting the metrics count of temporal file size in inner space compaction (#9743)
     new 92940d62e5f optimize iotconsensus

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5baa4b524e6)
            \
             N -- N -- N   refs/heads/55_wal_accumulation (92940d62e5f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../consensus/request/auth/AuthorPlan.java         |   29 +-
 .../iotdb/confignode/manager/ConfigManager.java    |    2 +-
 .../apache/iotdb/confignode/manager/IManager.java  |    2 +-
 .../confignode/manager/PermissionManager.java      |    3 +-
 .../iotdb/confignode/persistence/AuthorInfo.java   |   34 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   10 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |    8 +-
 .../confignode/persistence/AuthorInfoTest.java     |   30 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |   13 +-
 .../iotdb/consensus/ratis/SnapshotStorage.java     |    8 +-
 docs/UserGuide/Reference/Common-Config-Manual.md   |   11 +-
 .../zh/UserGuide/Reference/Common-Config-Manual.md |    9 +
 .../confignode/it/IoTDBClusterAuthorityIT.java     |   88 +-
 .../IoTDBLastQueryLastCache2IT.java}               |   25 +-
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      |  145 +++
 .../iotdb/db/it/query/IoTDBPaginationIT.java       |   59 +-
 .../db/it/udf/IoTDBUDTFBuiltinFunctionIT.java      |  500 ++++++++-
 .../iotdb/libudf/it/dprofile/DProfileIT.java       |   17 +
 .../metrics/metricsets/jvm/JvmThreadMetrics.java   |   31 +-
 .../apache/iotdb/metrics/utils/MetricLevel.java    |    3 +-
 .../resources/conf/iotdb-common.properties         |    9 +
 .../commons/auth/authorizer/BasicAuthorizer.java   |   35 +-
 .../iotdb/commons/auth/authorizer/IAuthorizer.java |   33 +-
 .../commons/auth/authorizer/OpenIdAuthorizer.java  |    3 +-
 .../iotdb/commons/auth/entity/PathPrivilege.java   |   29 +-
 .../org/apache/iotdb/commons/auth/entity/Role.java |   13 +-
 .../org/apache/iotdb/commons/auth/entity/User.java |   13 +-
 .../iotdb/commons/auth/role/BasicRoleManager.java  |    5 +-
 .../iotdb/commons/auth/role/IRoleManager.java      |    6 +-
 .../iotdb/commons/auth/user/BasicUserManager.java  |    5 +-
 .../iotdb/commons/auth/user/IUserManager.java      |    6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |    2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   10 +
 .../iotdb/commons/conf/CommonDescriptor.java       |    8 +
 .../org/apache/iotdb/commons/path/AlignedPath.java |    6 +
 .../iotdb/commons/sync/utils/SyncPathUtil.java     |    4 +
 .../BuiltinTimeSeriesGeneratingFunction.java       |   10 +
 .../commons/udf/service/UDFClassLoaderManager.java |    6 +-
 .../org/apache/iotdb/commons/utils/AuthUtils.java  |   96 +-
 .../org/apache/iotdb/commons/utils/IOUtils.java    |    8 +-
 .../apache/iotdb/commons/utils/JVMCommonUtils.java |   12 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        |    5 +
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |    5 +
 .../resources/conf/iotdb-datanode.properties       |    2 +-
 .../org/apache/iotdb/db/audit/AuditLogger.java     |    8 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   10 +-
 .../apache/iotdb/db/auth/AuthorizerManager.java    |   15 +-
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |   38 +-
 .../apache/iotdb/db/auth/IAuthorityFetcher.java    |    3 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   33 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   12 +
 .../directories/strategy/SequenceStrategy.java     |    2 +-
 .../iotdb/db/engine/TsFileMetricManager.java       |   63 +-
 .../performer/impl/FastCompactionPerformer.java    |   30 +-
 .../impl/ReadChunkCompactionPerformer.java         |   11 +-
 .../impl/ReadPointCompactionPerformer.java         |   24 +-
 .../execute/task/AbstractCompactionTask.java       |   12 +-
 .../execute/task/CompactionTaskSummary.java        |   18 +
 .../execute/task/CrossSpaceCompactionTask.java     |   11 +-
 .../execute/task/InnerSpaceCompactionTask.java     |    7 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |    4 +-
 .../fast/AlignedSeriesCompactionExecutor.java      |   19 +-
 .../readchunk/AlignedSeriesCompactionExecutor.java |    6 -
 .../utils/writer/AbstractCompactionWriter.java     |   10 +-
 .../estimator/AbstractCompactionEstimator.java     |    6 +
 .../ReadPointCrossCompactionEstimator.java         |  108 +-
 .../impl/RewriteCrossSpaceCompactionSelector.java  |   13 +-
 .../utils/CrossCompactionTaskResource.java         |   12 +
 .../db/metadata/cache/DataNodeDevicePathCache.java |   69 ++
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     |    8 +
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |   25 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |   21 +-
 .../db/metadata/mtree/store/CachedMTreeStore.java  |    4 +-
 .../iotdb/db/metadata/mtree/store/IMTreeStore.java |    2 +-
 .../db/metadata/mtree/store/MemMTreeStore.java     |    4 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |   11 +
 .../schemaregion/SchemaRegionMemoryImpl.java       |   18 +-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   17 +-
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |   11 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |    2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   78 +-
 .../iotdb/db/mpp/execution/driver/Driver.java      |    2 +-
 .../execution/exchange/MPPDataExchangeManager.java |    8 +-
 .../execution/exchange/sink/LocalSinkChannel.java  |   19 +-
 .../execution/exchange/sink/ShuffleSinkHandle.java |    7 +-
 .../mpp/execution/exchange/sink/SinkChannel.java   |   44 +-
 .../execution/exchange/source/SourceHandle.java    |   31 +-
 .../execution/executor/RegionWriteExecutor.java    |   84 +-
 .../operator/process/TransformOperator.java        |   13 +-
 .../execution/operator/source/SeriesScanUtil.java  |   24 +-
 .../execution/schedule/AbstractDriverThread.java   |    9 +
 .../db/mpp/execution/schedule/DriverScheduler.java |  116 +-
 .../mpp/execution/schedule/DriverTaskThread.java   |    7 +-
 .../schedule/queue/IndexedBlockingQueue.java       |    1 +
 .../multilevelqueue/MultilevelPriorityQueue.java   |    4 +-
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   |  140 ++-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |   82 +-
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |   38 +-
 .../iotdb/db/mpp/plan/analyze/ExpressionUtils.java |    3 +-
 .../db/mpp/plan/analyze/cache/PartitionCache.java  |   10 +-
 .../db/mpp/plan/execution/QueryExecution.java      |    6 +-
 .../visitor/ColumnTransformerVisitor.java          |    2 +-
 .../visitor/IntermediateLayerVisitor.java          |    4 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |   43 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |    8 +
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   12 +-
 .../plan/planner/distribution/SourceRewriter.java  |   11 +
 .../plan/node/load/LoadSingleTsFileNode.java       |   59 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   16 +-
 .../write/InternalCreateMultiTimeSeriesNode.java   |    1 +
 .../source/AlignedSeriesAggregationScanNode.java   |   50 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |    4 +
 .../node/source/SeriesAggregationScanNode.java     |   50 +-
 .../node/source/SeriesAggregationSourceNode.java   |   49 +-
 .../planner/plan/node/source/SeriesScanNode.java   |    4 +
 .../planner/plan/parameter/SeriesScanOptions.java  |    5 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   20 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  201 +++-
 .../dag/builder/EvaluationDAGBuilder.java          |    4 +-
 .../dag/input/QueryDataSetInputLayer.java          |    5 +-
 .../intermediate/ConstantIntermediateLayer.java    |    3 +-
 .../dag/intermediate/IntermediateLayer.java        |    4 +-
 .../MultiInputColumnIntermediateLayer.java         |    2 +-
 ...InputColumnMultiReferenceIntermediateLayer.java |    2 +-
 ...nputColumnSingleReferenceIntermediateLayer.java |    2 +-
 .../db/mpp/transformation/dag/udf/UDTFContext.java |    2 +-
 .../mpp/transformation/dag/udf/UDTFExecutor.java   |    2 +-
 .../datastructure/SerializableList.java            |    6 +-
 .../row/ElasticSerializableRowRecordList.java      |    6 +-
 .../row/SerializableRowRecordList.java             |    2 +-
 .../tv/ElasticSerializableBinaryTVList.java        |    2 +-
 .../tv/ElasticSerializableTVList.java              |    8 +-
 .../datastructure/tv/SerializableTVList.java       |    2 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |    5 +-
 .../v1/handler/StatementConstructionHandler.java   |    5 +-
 .../v2/handler/StatementConstructionHandler.java   |    5 +-
 .../iotdb/db/query/context/QueryContext.java       |    2 +
 .../db/query/control/QueryResourceManager.java     |    7 +-
 .../query/control/clientsession/ClientSession.java |    3 +-
 .../iotdb/db/query/reader/chunk/MemPageReader.java |   12 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   15 +-
 .../db/service/TemporaryQueryDataFileService.java  |   14 +-
 .../iotdb/db/service/metrics/FileMetrics.java      |   26 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |    2 +
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   54 +-
 .../iotdb/db/auth/AuthorizerManagerTest.java       |   26 +-
 .../auth/authorizer/LocalFileAuthorizerTest.java   |    9 +-
 .../iotdb/db/auth/entity/PathPrivilegeTest.java    |    8 +-
 .../org/apache/iotdb/db/auth/entity/RoleTest.java  |    8 +-
 .../org/apache/iotdb/db/auth/entity/UserTest.java  |    8 +-
 .../db/auth/role/LocalFileRoleAccessorTest.java    |    6 +-
 .../db/auth/role/LocalFileRoleManagerTest.java     |    8 +-
 .../db/auth/user/LocalFileUserAccessorTest.java    |    6 +-
 .../db/auth/user/LocalFileUserManagerTest.java     |    8 +-
 .../engine/compaction/AbstractCompactionTest.java  |    6 +
 .../compaction/CompactionTaskComparatorTest.java   |    8 +-
 .../compaction/FastAlignedCrossCompactionTest.java | 1098 ++++++++++++++----
 .../FastInnerCompactionPerformerTest.java          |   30 +-
 .../FastNonAlignedCrossCompactionTest.java         | 1114 +++++++++++++++----
 .../compaction/ReadChunkInnerCompactionTest.java   |  377 +++++++
 ...va => ReadPointAlignedCrossCompactionTest.java} | 1151 +++++++++++++++----
 ...=> ReadPointNonAlignedCrossCompactionTest.java} | 1167 ++++++++++++++++----
 .../CrossSpaceCompactionWithFastPerformerTest.java |    7 +-
 ...eCompactionWithFastPerformerValidationTest.java |  194 ++--
 ...actionWithReadPointPerformerValidationTest.java |  168 +--
 .../cross/RewriteCompactionFileSelectorTest.java   |   12 +
 .../utils/MultiTsFileDeviceIteratorTest.java       |   12 +-
 .../compaction/utils/TsFileGeneratorUtils.java     |  116 +-
 .../cache/DataNodeDevicePathCacheTest.java}        |   44 +-
 .../db/mpp/execution/QueryStateMachineTest.java    |    2 +
 .../mpp/plan/analyze/QueryTimePartitionTest.java   |   14 +-
 .../node/process/AggregationNodeSerdeTest.java     |    7 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |    8 +-
 .../plan/node/process/GroupByTagNodeSerdeTest.java |    5 +-
 .../source/SeriesAggregationScanNodeSerdeTest.java |    7 +-
 .../AlignedSeriesScanLimitOffsetPushDownTest.java  |   36 +-
 .../series/SeriesScanLimitOffsetPushDownTest.java  |   23 +-
 .../udf/datastructure/SerializableListTest.java    |    2 +-
 .../security/encrypt/MessageDigestEncryptTest.java |    6 +-
 .../src/main/thrift/confignode.thrift              |    4 +-
 thrift/src/main/thrift/client.thrift               |    3 +
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |    2 +-
 .../iotdb/tsfile/read/filter/GroupByFilter.java    |   13 +-
 .../read/filter/PredicateRemoveNotRewriter.java    |   50 +
 .../iotdb/tsfile/read/filter/TimeFilter.java       |  158 ++-
 .../iotdb/tsfile/read/filter/ValueFilter.java      |  179 +--
 .../tsfile/read/filter/basic/BinaryFilter.java     |    5 -
 .../iotdb/tsfile/read/filter/basic/Filter.java     |   11 +-
 .../tsfile/read/filter/factory/FilterFactory.java  |   10 +-
 .../tsfile/read/filter/operator/AndFilter.java     |   11 +
 .../iotdb/tsfile/read/filter/operator/Between.java |  127 ++-
 .../iotdb/tsfile/read/filter/operator/Eq.java      |   25 +-
 .../iotdb/tsfile/read/filter/operator/Gt.java      |   23 +-
 .../iotdb/tsfile/read/filter/operator/GtEq.java    |   23 +-
 .../iotdb/tsfile/read/filter/operator/In.java      |   34 +-
 .../iotdb/tsfile/read/filter/operator/Like.java    |   55 +-
 .../iotdb/tsfile/read/filter/operator/Lt.java      |   23 +-
 .../iotdb/tsfile/read/filter/operator/LtEq.java    |   23 +-
 .../iotdb/tsfile/read/filter/operator/NotEq.java   |   25 +-
 .../tsfile/read/filter/operator/NotFilter.java     |   26 +-
 .../tsfile/read/filter/operator/OrFilter.java      |   11 +
 .../iotdb/tsfile/read/filter/operator/Regexp.java  |   57 +-
 .../read/reader/chunk/AlignedChunkReader.java      |   40 +-
 .../tsfile/read/reader/chunk/ChunkReader.java      |    4 +
 .../iotdb/tsfile/read/reader/page/PageReader.java  |   12 +-
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |   42 +-
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |    4 -
 .../tsfile/read/filter/FilterSerializeTest.java    |   21 +-
 .../read/filter/MinTimeMaxTimeFilterTest.java      |   42 +-
 .../iotdb/tsfile/read/filter/OperatorTest.java     |    4 +-
 .../filter/PredicateRemoveNotRewriterTest.java     |  121 ++
 .../tsfile/read/filter/StatisticsFilterTest.java   |  113 +-
 212 files changed, 8083 insertions(+), 2411 deletions(-)
 copy integration-test/src/test/java/org/apache/iotdb/db/it/{aligned/IoTDBLastQueryWithDeletion2IT.java => last/IoTDBLastQueryLastCache2IT.java} (74%)
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastQueryLastCacheIT.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/cache/DataNodeDevicePathCache.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadChunkInnerCompactionTest.java
 copy server/src/test/java/org/apache/iotdb/db/engine/compaction/{FastAlignedCrossCompactionTest.java => ReadPointAlignedCrossCompactionTest.java} (83%)
 copy server/src/test/java/org/apache/iotdb/db/engine/compaction/{FastNonAlignedCrossCompactionTest.java => ReadPointNonAlignedCrossCompactionTest.java} (83%)
 copy server/src/test/java/org/apache/iotdb/db/{wal/DisableWALTest.java => metadata/cache/DataNodeDevicePathCacheTest.java} (50%)
 create mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/PredicateRemoveNotRewriter.java
 create mode 100644 tsfile/src/test/java/org/apache/iotdb/tsfile/read/filter/PredicateRemoveNotRewriterTest.java


[iotdb] 01/01: optimize iotconsensus

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch 55_wal_accumulation
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92940d62e5f4f3e6973f1accba42b61905311df8
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Fri May 5 11:03:12 2023 +0800

    optimize iotconsensus
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../iotdb/consensus/config/IoTConsensusConfig.java | 23 +++++++++++++++++-----
 .../consensus/iot/logdispatcher/LogDispatcher.java |  8 ++------
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fdaa9c..2fe821a7cb0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,6 +235,8 @@ public class IoTConsensusConfig {
     private final int maxLogEntriesNumPerBatch;
     private final int maxSizePerBatch;
     private final int maxPendingBatchesNum;
+
+    private final int maxQueueLength;
     private final long maxWaitingTimeForWaitBatchInMs;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
@@ -249,6 +251,7 @@ public class IoTConsensusConfig {
         int maxLogEntriesNumPerBatch,
         int maxSizePerBatch,
         int maxPendingBatchesNum,
+        int maxQueueLength,
         long maxWaitingTimeForWaitBatchInMs,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
@@ -261,6 +264,7 @@ public class IoTConsensusConfig {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
+      this.maxQueueLength = maxQueueLength;
       this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -284,6 +288,10 @@ public class IoTConsensusConfig {
       return maxPendingBatchesNum;
     }
 
+    public int getMaxQueueLength() {
+      return maxQueueLength;
+    }
+
     public long getMaxWaitingTimeForWaitBatchInMs() {
       return maxWaitingTimeForWaitBatchInMs;
     }
@@ -326,13 +334,12 @@ public class IoTConsensusConfig {
 
     public static class Builder {
 
-      private int maxLogEntriesNumPerBatch = 30;
+      private int maxLogEntriesNumPerBatch = 1024;
       private int maxSizePerBatch = 16 * 1024 * 1024;
-      // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
-      // in DataRegionStateMachine
-      private int maxPendingBatchesNum = 5;
+      private int maxPendingBatchesNum = 16;
+      private int maxQueueLength = 4096;
       private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
-      private int maxWaitingTimeForAccumulatingBatchInMs = 500;
+      private int maxWaitingTimeForAccumulatingBatchInMs = 100;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -356,6 +363,11 @@ public class IoTConsensusConfig {
         return this;
       }
 
+      public Builder setMaxQueueLength(int maxQueueLength) {
+        this.maxQueueLength = maxQueueLength;
+        return this;
+      }
+
       public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
           long maxWaitingTimeForWaitBatchInMs) {
         this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
@@ -408,6 +420,7 @@ public class IoTConsensusConfig {
             maxLogEntriesNumPerBatch,
             maxSizePerBatch,
             maxPendingBatchesNum,
+            maxQueueLength,
             maxWaitingTimeForWaitBatchInMs,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52a..cd70efe655b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -214,7 +214,7 @@ public class LogDispatcher {
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
       this.peer = peer;
       this.config = config;
-      this.pendingEntries = new LinkedBlockingQueue<>();
+      this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
       this.controller =
           new IndexController(
               impl.getStorageDir(),
@@ -314,10 +314,6 @@ public class LogDispatcher {
                 pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
             if (request != null) {
               bufferedEntries.add(request);
-              // If write pressure is low, we simply sleep a little to reduce the number of RPC
-              if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch()) {
-                Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
-              }
             }
           }
           MetricService.getInstance()