You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/09/11 03:12:37 UTC

[iotdb] branch load_v2 updated: abstract location selection and add two simple strategies

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new beaf63b98e4 abstract location selection and add two simple strategies
beaf63b98e4 is described below

commit beaf63b98e4cef73a0111f88984d29862ea0022f
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Sep 11 11:15:57 2023 +0800

    abstract location selection and add two simple strategies
---
 .../execution/load/TsFileSplitSender.java          |  88 +++++-------------
 .../load/locseq/FixedLocationSequencer.java        |  40 ++++++++
 .../execution/load/locseq/LocationSequencer.java   |  28 ++++++
 .../execution/load/locseq/LocationStatistics.java  |  79 ++++++++++++++++
 .../load/locseq/RandomLocationSequencer.java       |  41 ++++++++
 .../locseq/ThroughputBasedLocationSequencer.java   | 103 +++++++++++++++++++++
 .../execution/load/TsFileSplitSenderTest.java      |  16 ++++
 7 files changed, 328 insertions(+), 67 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index b198ace141b..b2bea04dd22 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +44,11 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.FixedLocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.RandomLocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLocationSequencer;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -72,8 +76,7 @@ public class TsFileSplitSender {
   // All consensus groups accessed in Phase1 should be notified in Phase2
   private final Set<TRegionReplicaSet> allReplicaSets = new ConcurrentSkipListSet<>();
   private String uuid;
-  private Map<TDataNodeLocation, Double> dataNodeThroughputMap = new ConcurrentHashMap<>();
-  private Random random = new Random();
+  private LocationStatistics locationStatistics = new LocationStatistics();
   private boolean isGeneratedByPipe;
   private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> phaseOneFailures =
       new ConcurrentHashMap<>();
@@ -107,6 +110,7 @@ public class TsFileSplitSender {
     } else {
       logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
     }
+    locationStatistics.logLocationStatistics();
   }
 
   private boolean firstPhase(LoadTsFileNode node) throws IOException {
@@ -191,55 +195,6 @@ public class TsFileSplitSender {
     return phaseTwoFailures.isEmpty();
   }
 
-  /**
-   * The rank (probability of being chosen) is calculated as throughput / totalThroughput for those
-   * nodes that have not been used, their throughput is defined as Float.MAX_VALUE
-   *
-   * @param replicaSet replica set to be ranked
-   * @return the nodes and their ranks
-   */
-  private List<Pair<TDataNodeLocation, Double>> rankLocations(TRegionReplicaSet replicaSet) {
-    List<Pair<TDataNodeLocation, Double>> locations =
-        new ArrayList<>(replicaSet.dataNodeLocations.size());
-    // retrieve throughput of each node
-    double totalThroughput = 0.0;
-    for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
-      // use Float.MAX_VALUE so that they can be added together
-      double throughput =
-          dataNodeThroughputMap.computeIfAbsent(dataNodeLocation, l -> (double) Float.MAX_VALUE);
-      locations.add(new Pair<>(dataNodeLocation, throughput));
-      totalThroughput += throughput;
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("Location throughput: {}", dataNodeThroughputMap.entrySet().stream()
-          .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
-          .collect(Collectors.toList()));
-    }
-    // calculate cumulative ranks
-    locations.get(0).right = locations.get(0).right / totalThroughput;
-    for (int i = 1; i < locations.size(); i++) {
-      Pair<TDataNodeLocation, Double> location = locations.get(i);
-      location.right = location.right / totalThroughput + locations.get(i - 1).right;
-    }
-    return locations;
-  }
-
-  private Pair<TDataNodeLocation, Double> chooseNextLocation(
-      List<Pair<TDataNodeLocation, Double>> locations) {
-    int chosen = 0;
-    double dice = random.nextDouble();
-    for (int i = 1; i < locations.size(); i++) {
-      if (locations.get(i - 1).right <= dice && dice < locations.get(i).right) {
-        chosen = i;
-      }
-    }
-    Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
-    // update ranks
-    for (Pair<TDataNodeLocation, Double> location : locations) {
-      location.right = location.right / (1 - chosenPair.right);
-    }
-    return chosenPair;
-  }
 
   /**
    * Split a piece node by series.
@@ -267,7 +222,12 @@ public class TsFileSplitSender {
     return result;
   }
 
-  @SuppressWarnings("BusyWait")
+  public LocationSequencer createLocationSequencer(TRegionReplicaSet replicaSet) {
+//    return new FixedLocationSequencer(replicaSet);
+//    return new RandomLocationSequencer(replicaSet);
+    return new ThroughputBasedLocationSequencer(replicaSet, locationStatistics);
+  }
+
   public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
     allReplicaSets.add(replicaSet);
 
@@ -278,21 +238,14 @@ public class TsFileSplitSender {
       TTsFilePieceReq loadTsFileReq =
           new TTsFilePieceReq(node.serializeToByteBuffer(), uuid, replicaSet.getRegionId());
       loadTsFileReq.isRelay = true;
-      List<Pair<TDataNodeLocation, Double>> locations = rankLocations(replicaSet);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Location ranks: {}",
-            locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), p.right)).collect(
-                Collectors.toList()));
-      }
+      LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
 
       boolean loadSucceed = false;
       Exception lastConnectionError = null;
       TDataNodeLocation currLocation = null;
-      while (!locations.isEmpty()) {
-        // the chosen location is removed from the list
-        Pair<TDataNodeLocation, Double> locationRankPair = chooseNextLocation(locations);
-        logger.debug("Chose location {}", locationRankPair.left.getDataNodeId());
-        currLocation = locationRankPair.left;
+      for (TDataNodeLocation location : locationSequencer) {
+        logger.debug("Chose location {}", location.getDataNodeId());
+        currLocation = location;
         startTime = System.nanoTime();
         for (int i = 0; i < MAX_RETRY; i++) {
           try (SyncDataNodeInternalServiceClient client =
@@ -325,16 +278,17 @@ public class TsFileSplitSender {
 
       if (!loadSucceed) {
         String warning = NODE_CONNECTION_ERROR;
-        logger.warn(warning, locations, lastConnectionError);
+        logger.warn(warning, currLocation, lastConnectionError);
         TSStatus status = new TSStatus();
         status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-        status.setMessage(warning + locations);
+        status.setMessage(warning + currLocation);
         phaseOneFailures.put(
             new Pair<>(node, replicaSet), new FragmentInstanceDispatchException(status));
         return false;
       }
       long timeConsumption = System.nanoTime() - startTime;
-      dataNodeThroughputMap.put(currLocation, node.getDataSize() * 1.0 / timeConsumption);
+      locationStatistics.updateThroughput(currLocation, node.getDataSize() * 1.0 / timeConsumption);
+      locationStatistics.increaseHit(currLocation);
       return true;
     }).collect(Collectors.toList());
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
new file mode 100644
index 00000000000..6bd49396f0e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class FixedLocationSequencer implements LocationSequencer{
+
+  private List<TDataNodeLocation> orderedLocations;
+
+  public FixedLocationSequencer(TRegionReplicaSet replicaSet) {
+    orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
+  }
+
+  @Override
+  public Iterator<TDataNodeLocation> iterator() {
+    return orderedLocations.iterator();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
new file mode 100644
index 00000000000..cb3d8723baf
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.Iterator;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+
+public interface LocationSequencer extends Iterable<TDataNodeLocation> {
+
+
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
new file mode 100644
index 00000000000..5db64de1e65
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocationStatistics {
+
+  private static final Logger logger = LoggerFactory.getLogger(LocationStatistics.class);
+  private Map<TDataNodeLocation, Statistic> locationStatisticMap = new ConcurrentHashMap<>();
+
+  public double getThroughput(TDataNodeLocation location) {
+    return locationStatisticMap.computeIfAbsent(location,
+        l -> new Statistic()).throughput;
+  }
+
+  public void updateThroughput(TDataNodeLocation location, double throughput) {
+    locationStatisticMap.computeIfAbsent(location,
+        l -> new Statistic()).setThroughput(throughput);
+  }
+
+  public void increaseHit(TDataNodeLocation location) {
+    locationStatisticMap.computeIfAbsent(location,
+        l -> new Statistic()).increaseHit();
+  }
+
+  public static class Statistic {
+
+    private double throughput = Float.MAX_VALUE;
+    private int hit;
+
+    public void setThroughput(double throughput) {
+      this.throughput = throughput;
+    }
+
+    public void increaseHit() {
+      hit++;
+    }
+
+    @Override
+    public String toString() {
+      return "{" +
+          "throughput=" + throughput +
+          ", hit=" + hit +
+          '}';
+    }
+  }
+
+  public void logLocationStatistics() {
+    if (logger.isInfoEnabled()) {
+      logger.info("Location throughput: {}", locationStatisticMap.entrySet().stream()
+          .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
+          .collect(Collectors.toList()));
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
new file mode 100644
index 00000000000..8af95667865
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class RandomLocationSequencer implements LocationSequencer {
+
+  private List<TDataNodeLocation> orderedLocations;
+  public RandomLocationSequencer(TRegionReplicaSet replicaSet) {
+    orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
+    Collections.shuffle(orderedLocations);
+  }
+
+  @Override
+  public Iterator<TDataNodeLocation> iterator() {
+    return orderedLocations.iterator();
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
new file mode 100644
index 00000000000..dcd67f04dcf
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThroughputBasedLocationSequencer implements LocationSequencer {
+
+  private static final Logger logger = LoggerFactory.getLogger(
+      ThroughputBasedLocationSequencer.class);
+  private Random random = new Random();
+  private List<TDataNodeLocation> orderedLocations;
+
+  public ThroughputBasedLocationSequencer(TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+    List<Pair<TDataNodeLocation, Double>> locationRanks = rankLocations(replicaSet, locationStatistics);
+    orderedLocations = new ArrayList<>(locationRanks.size());
+    while (!locationRanks.isEmpty()) {
+      // the chosen location is removed from the list
+      orderedLocations.add(chooseNextLocation(locationRanks).left);
+    }
+  }
+
+  /**
+   * The rank (probability of being chosen) is calculated as throughput / totalThroughput for those
+   * nodes that have not been used, their throughput is defined as Float.MAX_VALUE
+   *
+   * @param replicaSet replica set to be ranked
+   * @return the nodes and their ranks
+   */
+  private List<Pair<TDataNodeLocation, Double>> rankLocations(TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+    List<Pair<TDataNodeLocation, Double>> locations =
+        new ArrayList<>(replicaSet.dataNodeLocations.size());
+    // retrieve throughput of each node
+    double totalThroughput = 0.0;
+    for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
+      // use Float.MAX_VALUE so that they can be added together
+      double throughput = locationStatistics.getThroughput(dataNodeLocation);
+      locations.add(new Pair<>(dataNodeLocation, throughput));
+      totalThroughput += throughput;
+    }
+
+    // calculate cumulative ranks
+    locations.get(0).right = locations.get(0).right / totalThroughput;
+    for (int i = 1; i < locations.size(); i++) {
+      Pair<TDataNodeLocation, Double> location = locations.get(i);
+      location.right = location.right / totalThroughput + locations.get(i - 1).right;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Location ranks: {}",
+          locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), p.right)).collect(
+              Collectors.toList()));
+    }
+    return locations;
+  }
+
+  private Pair<TDataNodeLocation, Double> chooseNextLocation(
+      List<Pair<TDataNodeLocation, Double>> locations) {
+    int chosen = 0;
+    double dice = random.nextDouble();
+    for (int i = 1; i < locations.size(); i++) {
+      if (locations.get(i - 1).right <= dice && dice < locations.get(i).right) {
+        chosen = i;
+      }
+    }
+    Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+    // update ranks
+    for (Pair<TDataNodeLocation, Double> location : locations) {
+      location.right = location.right / (1 - chosenPair.right);
+    }
+    return chosenPair;
+  }
+
+  @Override
+  public Iterator<TDataNodeLocation> iterator() {
+    return orderedLocations.iterator();
+  }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 199c1bbd4f2..2e282435235 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -55,6 +55,7 @@ public class TsFileSplitSenderTest extends TestBase {
   // the third key is UUid, the value is command type
   protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> phaseTwoResults =
       new ConcurrentSkipListMap<>();
+  private long dummyDelayMS = 800;
 
 
   @Test
@@ -94,6 +95,21 @@ public class TsFileSplitSenderTest extends TestBase {
     splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
         Collectors.toList()));
 
+    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
+      try {
+        Thread.sleep(dummyDelayMS);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
+      try {
+        Thread.sleep(dummyDelayMS / 2);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     // forward to other replicas in the group
     if (req.isRelay) {
       req.isRelay = false;