You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/25 09:18:03 UTC

[iotdb] branch multiLeader_out_of_order created (now 5691d77c1f)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5691d77c1f add log when update regionRouteMap.

This branch includes the following new commits:

     new c33c824476 fix array list to linked list.
     new 9c8f90677b Merge branch 'remote-master' into fix/multiLeader_out_of_order
     new 2fc15d31e5 add some log.
     new 1765b818f2 Merge branch 'remote-master' into fix/multiLeader_out_of_order
     new 92d77eda22 downgrade log.
     new 268395de21 add log.
     new 5691d77c1f add log when update regionRouteMap.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/07: Merge branch 'remote-master' into fix/multiLeader_out_of_order

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9c8f90677b3f5be5c4924b938d8096778ecf7579
Merge: c33c824476 842105470d
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Fri Jul 22 15:23:12 2022 +0800

    Merge branch 'remote-master' into fix/multiLeader_out_of_order

 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  24 +-
 .../confignode/client/DataNodeRequestType.java     |   3 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 221 +++---
 .../async/handlers/AbstractRetryHandler.java       |  33 +-
 .../client/async/handlers/CreateRegionHandler.java |  14 +-
 .../client/async/handlers/FlushHandler.java        |  16 +-
 .../async/handlers/FunctionManagementHandler.java  |  11 +-
 .../client/async/handlers/SetTTLHandler.java       |   7 +-
 .../handlers/UpdateRegionRouteMapHandler.java      |   9 +-
 .../request/write/RemoveDataNodePlan.java          | 124 +---
 .../statemachine/PartitionRegionStateMachine.java  |   2 -
 .../confignode/manager/ClusterSchemaManager.java   |  30 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  10 +-
 .../confignode/manager/DataNodeRemoveManager.java  | 806 ---------------------
 .../apache/iotdb/confignode/manager/IManager.java  |   6 +-
 .../iotdb/confignode/manager/NodeManager.java      |  71 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  34 +
 .../iotdb/confignode/manager/UDFManager.java       |  63 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  31 +-
 .../iotdb/confignode/persistence/NodeInfo.java     | 191 +----
 .../procedure/env/ConfigNodeProcedureEnv.java      |  26 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 381 ++++++++++
 .../procedure/impl/AbstractNodeProcedure.java      |  64 ++
 .../procedure/impl/AddConfigNodeProcedure.java     |  33 +-
 .../procedure/impl/RegionMigrateProcedure.java     | 230 ++++++
 .../procedure/impl/RemoveConfigNodeProcedure.java  |  33 +-
 .../procedure/impl/RemoveDataNodeProcedure.java    | 184 +++++
 .../confignode/procedure/scheduler/LockQueue.java  |  63 ++
 .../state/RegionTransitionState.java}              |  21 +-
 .../state/RemoveDataNodeState.java}                |  22 +-
 .../procedure/store/ProcedureFactory.java          |  14 +
 .../iotdb/confignode/service/ConfigNode.java       |  27 -
 .../confignode/service/ConfigNodeCommandLine.java  |  31 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   2 +-
 .../request/write/RemoveDataNodePlanTest.java      |   1 -
 .../iotdb/db/it/IoTDBInsertWithoutTimeIT.java      | 122 ++++
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |  12 +-
 .../db/integration/IoTDBInsertWithoutTimeIT.java   | 131 ----
 .../commons/partition/DataPartitionQueryParam.java |  14 -
 pom.xml                                            |   2 +-
 .../operator/process/TimeJoinOperator.java         |   7 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  47 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  11 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  11 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  |  63 +-
 .../db/mpp/plan/analyze/cache/PartitionCache.java  | 166 +++--
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  64 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  39 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  54 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  45 +-
 .../plan/planner/distribution/SourceRewriter.java  |  32 +-
 .../planner/plan/node/process/AggregationNode.java |  12 +-
 .../planner/plan/node/process/DeviceMergeNode.java |  44 +-
 .../planner/plan/node/process/DeviceViewNode.java  |  42 +-
 .../plan/planner/plan/node/process/FillNode.java   |  13 +-
 .../plan/planner/plan/node/process/FilterNode.java |   8 +-
 .../plan/node/process/GroupByLevelNode.java        |  12 +-
 .../plan/node/process/LastQueryMergeNode.java      |  48 +-
 .../node/process/SlidingWindowAggregationNode.java |  12 +-
 .../plan/planner/plan/node/process/SortNode.java   |  12 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  12 +-
 .../planner/plan/node/process/TransformNode.java   |  12 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  10 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  12 +-
 .../node/source/SeriesAggregationScanNode.java     |  10 +-
 .../node/source/SeriesAggregationSourceNode.java   |   6 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  14 +-
 .../planner/plan/parameter/OrderByParameter.java   |  89 +++
 .../plan/statement/component/OrderByComponent.java |  92 +++
 .../component/{OrderBy.java => Ordering.java}      |  22 +-
 .../db/mpp/plan/statement/component/SortItem.java  |  83 +++
 .../db/mpp/plan/statement/component/SortKey.java   |  21 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  62 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  53 +-
 .../java/org/apache/iotdb/db/service/DataNode.java | 105 ---
 .../db/service/DataNodeServerCommandLine.java      | 120 ++-
 .../iotdb/db/service/RegionMigrateService.java     |  15 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   6 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   4 +-
 .../operator/AlignedSeriesScanOperatorTest.java    |   6 +-
 .../mpp/execution/operator/LimitOperatorTest.java  |   4 +-
 .../mpp/execution/operator/OffsetOperatorTest.java |   8 +-
 .../operator/RawDataAggregationOperatorTest.java   |   4 +-
 .../execution/operator/TimeJoinOperatorTest.java   |   8 +-
 .../mpp/plan/analyze/cache/PartitionCacheTest.java | 432 +++++++++++
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |  11 +-
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     | 149 ++--
 .../distribution/AggregationDistributionTest.java  |  34 +-
 .../distribution/DistributionPlannerBasicTest.java |  42 +-
 .../mpp/plan/plan/distribution/LastQueryTest.java  |   3 +-
 .../distribution/NoDataRegionPlanningTest.java     |  20 +-
 .../node/process/AggregationNodeSerdeTest.java     |   6 +-
 .../plan/node/process/DeviceViewNodeSerdeTest.java |  16 +-
 .../plan/node/process/ExchangeNodeSerdeTest.java   |   5 +-
 .../plan/plan/node/process/FillNodeSerdeTest.java  |   7 +-
 .../plan/node/process/FilterNodeSerdeTest.java     |   7 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   8 +-
 .../plan/plan/node/process/LimitNodeSerdeTest.java |   4 +-
 .../plan/node/process/OffsetNodeSerdeTest.java     |   4 +-
 .../plan/plan/node/process/SortNodeSerdeTest.java  |   7 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |   9 +-
 .../source/SeriesAggregationScanNodeSerdeTest.java |   4 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   4 +-
 103 files changed, 3041 insertions(+), 2370 deletions(-)


[iotdb] 04/07: Merge branch 'remote-master' into fix/multiLeader_out_of_order

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1765b818f2d9f396f89b5f381d8814dbdd5aff67
Merge: 2fc15d31e5 bd1894fb72
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Mon Jul 25 10:56:50 2022 +0800

    Merge branch 'remote-master' into fix/multiLeader_out_of_order

 .../confignode/client/DataNodeRequestType.java     |  3 +-
 .../async/datanode/AsyncDataNodeClientPool.java    | 34 +++++++++++
 .../handlers/UpdateConfigNodeGroupHandler.java     | 66 ++++++++++++++++++++++
 .../iotdb/confignode/persistence/NodeInfo.java     | 13 ++++-
 .../persistence/partition/PartitionInfo.java       |  2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  9 +++
 .../procedure/impl/AddConfigNodeProcedure.java     |  1 +
 .../procedure/impl/RemoveConfigNodeProcedure.java  |  1 +
 .../scheduler/FixedRateFragInsStateTracker.java    | 12 +++-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 17 ++++++
 thrift/src/main/thrift/datanode.thrift             | 10 ++++
 11 files changed, 163 insertions(+), 5 deletions(-)


[iotdb] 07/07: add log when update regionRouteMap.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5691d77c1f4d7ae0c7df772b4e249e26eb8ad9f0
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Mon Jul 25 17:15:49 2022 +0800

    add log when update regionRouteMap.
---
 .../iotdb/db/mpp/plan/analyze/cache/PartitionCache.java     | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 86fbe66bad..c77252ab07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.plan.analyze.cache;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -68,6 +69,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 public class PartitionCache {
   private static final Logger logger = LoggerFactory.getLogger(PartitionCache.class);
@@ -447,6 +449,17 @@ public class PartitionCache {
       long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> map) {
     try {
       regionReplicaSetLock.writeLock().lock();
+      logger.info("[updateGroupIdToReplicaSetMap] timestamp:{}", timestamp);
+      logger.info("[updateGroupIdToReplicaSetMap] map:");
+      for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry : map.entrySet()) {
+        logger.info(
+            "[updateGroupIdToReplicaSetMap]\t {}={}",
+            entry.getKey(),
+            entry.getValue().getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toList()));
+      }
+
       boolean result = (timestamp == latestUpdateTime.accumulateAndGet(timestamp, Math::max));
       // if timestamp is greater than latestUpdateTime, then update
       if (result) {


[iotdb] 06/07: add log.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 268395de2188c4c6a064cb10a7e680112ea9a58d
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Mon Jul 25 16:28:22 2022 +0800

    add log.
---
 .../org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index d74d1613a7..5573f6ba9a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -174,6 +174,7 @@ public class MultiLeaderServerImpl {
       for (int i = 0; i < size; i++) {
         configuration.add(Peer.deserialize(buffer));
       }
+      logger.info("Recover multiLeader, configuration: {}", configuration);
     } catch (IOException e) {
       logger.error("Unexpected error occurs when recovering configuration", e);
     }


[iotdb] 05/07: downgrade log.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92d77eda227e242b3de59fd1e8f4f835a7aef942
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Mon Jul 25 15:51:24 2022 +0800

    downgrade log.
---
 .../apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index a27357d304..71ce7caa45 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -302,7 +302,7 @@ public class LogDispatcher {
         AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
         TSyncLogReq req =
             new TSyncLogReq(peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
-        logger.info(
+        logger.debug(
             "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
             batch.getStartIndex(),
             batch.getEndIndex(),


[iotdb] 01/07: fix array list to linked list.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c33c82447689de43be797f66a0cabee6021e94ee
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Thu Jul 21 09:55:21 2022 +0800

    fix array list to linked list.
---
 .../iotdb/consensus/multileader/logdispatcher/LogDispatcher.java      | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8ab54482f1..a9329d7d4d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -34,13 +34,11 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.consensus.ratis.Utils;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -220,7 +218,7 @@ public class LogDispatcher {
 
     public PendingBatch getBatch() {
       PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
+      List<TLogBatch> logBatches = new LinkedList<>();
       long startIndex = syncStatus.getNextSendingIndex();
       logger.debug("get batch. startIndex: {}", startIndex);
       long endIndex;


[iotdb] 03/07: add some log.

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch multiLeader_out_of_order
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2fc15d31e5c324d8e46e69824d26a4dc1e783849
Author: stormbroken <18...@smail.nju.edu.cn>
AuthorDate: Mon Jul 25 09:39:23 2022 +0800

    add some log.
---
 .../common/request/IndexedConsensusRequest.java      | 20 ++------------------
 .../consensus/multileader/MultiLeaderServerImpl.java |  4 ++--
 .../multileader/logdispatcher/LogDispatcher.java     | 11 +++++++++--
 3 files changed, 13 insertions(+), 22 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de7ac07250..de3aca433b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.consensus.common.request;
 
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
@@ -31,18 +29,10 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in other nodes. */
   private final long searchIndex;
 
-  private final long safelyDeletedSearchIndex;
-
   private final List<IConsensusRequest> requests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
-    this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, requests);
-  }
-
-  public IndexedConsensusRequest(
-      long searchIndex, long safelyDeletedSearchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
-    this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
     this.requests = requests;
   }
 
@@ -59,10 +49,6 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return searchIndex;
   }
 
-  public long getSafelyDeletedSearchIndex() {
-    return safelyDeletedSearchIndex;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -72,13 +58,11 @@ public class IndexedConsensusRequest implements IConsensusRequest {
       return false;
     }
     IndexedConsensusRequest that = (IndexedConsensusRequest) o;
-    return searchIndex == that.searchIndex
-        && safelyDeletedSearchIndex == that.safelyDeletedSearchIndex
-        && requests.equals(that.requests);
+    return searchIndex == that.searchIndex && requests.equals(that.requests);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(searchIndex, safelyDeletedSearchIndex, requests);
+    return Objects.hash(searchIndex, requests);
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index a0439bf8c9..d74d1613a7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -114,9 +114,9 @@ public class MultiLeaderServerImpl {
           buildIndexedConsensusRequestForLocalRequest(request);
       if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
         logger.info(
-            "DataRegion[{}]: index after build: safeIndex: {}, searchIndex: {}",
+            "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
             thisNode.getGroupId(),
-            indexedConsensusRequest.getSafelyDeletedSearchIndex(),
+            getCurrentSafelyDeletedSearchIndex(),
             indexedConsensusRequest.getSearchIndex());
       }
       // TODO wal and memtable
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index a9329d7d4d..a27357d304 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -34,11 +34,13 @@ import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.consensus.ratis.Utils;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -218,9 +220,9 @@ public class LogDispatcher {
 
     public PendingBatch getBatch() {
       PendingBatch batch;
-      List<TLogBatch> logBatches = new LinkedList<>();
+      List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
-      logger.debug("get batch. startIndex: {}", startIndex);
+      logger.debug("[GetBatch] startIndex: {}", startIndex);
       long endIndex;
       if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
@@ -300,6 +302,11 @@ public class LogDispatcher {
         AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
         TSyncLogReq req =
             new TSyncLogReq(peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
+        logger.info(
+            "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
+            batch.getStartIndex(),
+            batch.getEndIndex(),
+            peer.getGroupId().convertToTConsensusGroupId());
         client.syncLog(req, handler);
       } catch (IOException | TException e) {
         logger.error("Can not sync logs to peer {} because", peer, e);