You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/09/11 03:12:37 UTC
[iotdb] branch load_v2 updated: abstract location selection and add two simple strategies
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/load_v2 by this push:
new beaf63b98e4 abstract location selection and add two simple strategies
beaf63b98e4 is described below
commit beaf63b98e4cef73a0111f88984d29862ea0022f
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Sep 11 11:15:57 2023 +0800
abstract location selection and add two simple strategies
---
.../execution/load/TsFileSplitSender.java | 88 +++++-------------
.../load/locseq/FixedLocationSequencer.java | 40 ++++++++
.../execution/load/locseq/LocationSequencer.java | 28 ++++++
.../execution/load/locseq/LocationStatistics.java | 79 ++++++++++++++++
.../load/locseq/RandomLocationSequencer.java | 41 ++++++++
.../locseq/ThroughputBasedLocationSequencer.java | 103 +++++++++++++++++++++
.../execution/load/TsFileSplitSenderTest.java | 16 ++++
7 files changed, 328 insertions(+), 67 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index b198ace141b..b2bea04dd22 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -45,6 +44,11 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.FixedLocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.LocationStatistics;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.RandomLocationSequencer;
+import org.apache.iotdb.db.queryengine.execution.load.locseq.ThroughputBasedLocationSequencer;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -72,8 +76,7 @@ public class TsFileSplitSender {
// All consensus groups accessed in Phase1 should be notified in Phase2
private final Set<TRegionReplicaSet> allReplicaSets = new ConcurrentSkipListSet<>();
private String uuid;
- private Map<TDataNodeLocation, Double> dataNodeThroughputMap = new ConcurrentHashMap<>();
- private Random random = new Random();
+ private LocationStatistics locationStatistics = new LocationStatistics();
private boolean isGeneratedByPipe;
private Map<Pair<LoadTsFilePieceNode, TRegionReplicaSet>, Exception> phaseOneFailures =
new ConcurrentHashMap<>();
@@ -107,6 +110,7 @@ public class TsFileSplitSender {
} else {
logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
}
+ locationStatistics.logLocationStatistics();
}
private boolean firstPhase(LoadTsFileNode node) throws IOException {
@@ -191,55 +195,6 @@ public class TsFileSplitSender {
return phaseTwoFailures.isEmpty();
}
- /**
- * The rank (probability of being chosen) is calculated as throughput / totalThroughput for those
- * nodes that have not been used, their throughput is defined as Float.MAX_VALUE
- *
- * @param replicaSet replica set to be ranked
- * @return the nodes and their ranks
- */
- private List<Pair<TDataNodeLocation, Double>> rankLocations(TRegionReplicaSet replicaSet) {
- List<Pair<TDataNodeLocation, Double>> locations =
- new ArrayList<>(replicaSet.dataNodeLocations.size());
- // retrieve throughput of each node
- double totalThroughput = 0.0;
- for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
- // use Float.MAX_VALUE so that they can be added together
- double throughput =
- dataNodeThroughputMap.computeIfAbsent(dataNodeLocation, l -> (double) Float.MAX_VALUE);
- locations.add(new Pair<>(dataNodeLocation, throughput));
- totalThroughput += throughput;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Location throughput: {}", dataNodeThroughputMap.entrySet().stream()
- .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
- .collect(Collectors.toList()));
- }
- // calculate cumulative ranks
- locations.get(0).right = locations.get(0).right / totalThroughput;
- for (int i = 1; i < locations.size(); i++) {
- Pair<TDataNodeLocation, Double> location = locations.get(i);
- location.right = location.right / totalThroughput + locations.get(i - 1).right;
- }
- return locations;
- }
-
- private Pair<TDataNodeLocation, Double> chooseNextLocation(
- List<Pair<TDataNodeLocation, Double>> locations) {
- int chosen = 0;
- double dice = random.nextDouble();
- for (int i = 1; i < locations.size(); i++) {
- if (locations.get(i - 1).right <= dice && dice < locations.get(i).right) {
- chosen = i;
- }
- }
- Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
- // update ranks
- for (Pair<TDataNodeLocation, Double> location : locations) {
- location.right = location.right / (1 - chosenPair.right);
- }
- return chosenPair;
- }
/**
* Split a piece node by series.
@@ -267,7 +222,12 @@ public class TsFileSplitSender {
return result;
}
- @SuppressWarnings("BusyWait")
+ public LocationSequencer createLocationSequencer(TRegionReplicaSet replicaSet) {
+// return new FixedLocationSequencer(replicaSet);
+// return new RandomLocationSequencer(replicaSet);
+ return new ThroughputBasedLocationSequencer(replicaSet, locationStatistics);
+ }
+
public boolean dispatchOnePieceNode(LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
allReplicaSets.add(replicaSet);
@@ -278,21 +238,14 @@ public class TsFileSplitSender {
TTsFilePieceReq loadTsFileReq =
new TTsFilePieceReq(node.serializeToByteBuffer(), uuid, replicaSet.getRegionId());
loadTsFileReq.isRelay = true;
- List<Pair<TDataNodeLocation, Double>> locations = rankLocations(replicaSet);
- if (logger.isDebugEnabled()) {
- logger.debug("Location ranks: {}",
- locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), p.right)).collect(
- Collectors.toList()));
- }
+ LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
boolean loadSucceed = false;
Exception lastConnectionError = null;
TDataNodeLocation currLocation = null;
- while (!locations.isEmpty()) {
- // the chosen location is removed from the list
- Pair<TDataNodeLocation, Double> locationRankPair = chooseNextLocation(locations);
- logger.debug("Chose location {}", locationRankPair.left.getDataNodeId());
- currLocation = locationRankPair.left;
+ for (TDataNodeLocation location : locationSequencer) {
+ logger.debug("Chose location {}", location.getDataNodeId());
+ currLocation = location;
startTime = System.nanoTime();
for (int i = 0; i < MAX_RETRY; i++) {
try (SyncDataNodeInternalServiceClient client =
@@ -325,16 +278,17 @@ public class TsFileSplitSender {
if (!loadSucceed) {
String warning = NODE_CONNECTION_ERROR;
- logger.warn(warning, locations, lastConnectionError);
+ logger.warn(warning, currLocation, lastConnectionError);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(warning + locations);
+ status.setMessage(warning + currLocation);
phaseOneFailures.put(
new Pair<>(node, replicaSet), new FragmentInstanceDispatchException(status));
return false;
}
long timeConsumption = System.nanoTime() - startTime;
- dataNodeThroughputMap.put(currLocation, node.getDataSize() * 1.0 / timeConsumption);
+ locationStatistics.updateThroughput(currLocation, node.getDataSize() * 1.0 / timeConsumption);
+ locationStatistics.increaseHit(currLocation);
return true;
}).collect(Collectors.toList());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
new file mode 100644
index 00000000000..6bd49396f0e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/FixedLocationSequencer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class FixedLocationSequencer implements LocationSequencer{
+
+ private List<TDataNodeLocation> orderedLocations;
+
+ public FixedLocationSequencer(TRegionReplicaSet replicaSet) {
+ orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
+ }
+
+ @Override
+ public Iterator<TDataNodeLocation> iterator() {
+ return orderedLocations.iterator();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
new file mode 100644
index 00000000000..cb3d8723baf
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationSequencer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.Iterator;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+
+public interface LocationSequencer extends Iterable<TDataNodeLocation> {
+
+
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
new file mode 100644
index 00000000000..5db64de1e65
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/LocationStatistics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocationStatistics {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocationStatistics.class);
+ private Map<TDataNodeLocation, Statistic> locationStatisticMap = new ConcurrentHashMap<>();
+
+ public double getThroughput(TDataNodeLocation location) {
+ return locationStatisticMap.computeIfAbsent(location,
+ l -> new Statistic()).throughput;
+ }
+
+ public void updateThroughput(TDataNodeLocation location, double throughput) {
+ locationStatisticMap.computeIfAbsent(location,
+ l -> new Statistic()).setThroughput(throughput);
+ }
+
+ public void increaseHit(TDataNodeLocation location) {
+ locationStatisticMap.computeIfAbsent(location,
+ l -> new Statistic()).increaseHit();
+ }
+
+ public static class Statistic {
+
+ private double throughput = Float.MAX_VALUE;
+ private int hit;
+
+ public void setThroughput(double throughput) {
+ this.throughput = throughput;
+ }
+
+ public void increaseHit() {
+ hit++;
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "throughput=" + throughput +
+ ", hit=" + hit +
+ '}';
+ }
+ }
+
+ public void logLocationStatistics() {
+ if (logger.isInfoEnabled()) {
+ logger.info("Location throughput: {}", locationStatisticMap.entrySet().stream()
+ .map(e -> new Pair<>(e.getKey().getDataNodeId(), e.getValue()))
+ .collect(Collectors.toList()));
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
new file mode 100644
index 00000000000..8af95667865
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/RandomLocationSequencer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class RandomLocationSequencer implements LocationSequencer {
+
+ private List<TDataNodeLocation> orderedLocations;
+ public RandomLocationSequencer(TRegionReplicaSet replicaSet) {
+ orderedLocations = new ArrayList<>(replicaSet.dataNodeLocations);
+ Collections.shuffle(orderedLocations);
+ }
+
+ @Override
+ public Iterator<TDataNodeLocation> iterator() {
+ return orderedLocations.iterator();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
new file mode 100644
index 00000000000..dcd67f04dcf
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load.locseq;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThroughputBasedLocationSequencer implements LocationSequencer {
+
+ private static final Logger logger = LoggerFactory.getLogger(
+ ThroughputBasedLocationSequencer.class);
+ private Random random = new Random();
+ private List<TDataNodeLocation> orderedLocations;
+
+ public ThroughputBasedLocationSequencer(TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+ List<Pair<TDataNodeLocation, Double>> locationRanks = rankLocations(replicaSet, locationStatistics);
+ orderedLocations = new ArrayList<>(locationRanks.size());
+ while (!locationRanks.isEmpty()) {
+ // the chosen location is removed from the list
+ orderedLocations.add(chooseNextLocation(locationRanks).left);
+ }
+ }
+
+ /**
+ * The rank (probability of being chosen) is calculated as throughput / totalThroughput for those
+ * nodes that have not been used, their throughput is defined as Float.MAX_VALUE
+ *
+ * @param replicaSet replica set to be ranked
+ * @return the nodes and their ranks
+ */
+ private List<Pair<TDataNodeLocation, Double>> rankLocations(TRegionReplicaSet replicaSet, LocationStatistics locationStatistics) {
+ List<Pair<TDataNodeLocation, Double>> locations =
+ new ArrayList<>(replicaSet.dataNodeLocations.size());
+ // retrieve throughput of each node
+ double totalThroughput = 0.0;
+ for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
+ // use Float.MAX_VALUE so that they can be added together
+ double throughput = locationStatistics.getThroughput(dataNodeLocation);
+ locations.add(new Pair<>(dataNodeLocation, throughput));
+ totalThroughput += throughput;
+ }
+
+ // calculate cumulative ranks
+ locations.get(0).right = locations.get(0).right / totalThroughput;
+ for (int i = 1; i < locations.size(); i++) {
+ Pair<TDataNodeLocation, Double> location = locations.get(i);
+ location.right = location.right / totalThroughput + locations.get(i - 1).right;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Location ranks: {}",
+ locations.stream().map(p -> new Pair<>(p.left.getDataNodeId(), p.right)).collect(
+ Collectors.toList()));
+ }
+ return locations;
+ }
+
+ private Pair<TDataNodeLocation, Double> chooseNextLocation(
+ List<Pair<TDataNodeLocation, Double>> locations) {
+ int chosen = 0;
+ double dice = random.nextDouble();
+ for (int i = 1; i < locations.size(); i++) {
+ if (locations.get(i - 1).right <= dice && dice < locations.get(i).right) {
+ chosen = i;
+ }
+ }
+ Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+ // update ranks
+ for (Pair<TDataNodeLocation, Double> location : locations) {
+ location.right = location.right / (1 - chosenPair.right);
+ }
+ return chosenPair;
+ }
+
+ @Override
+ public Iterator<TDataNodeLocation> iterator() {
+ return orderedLocations.iterator();
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 199c1bbd4f2..2e282435235 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -55,6 +55,7 @@ public class TsFileSplitSenderTest extends TestBase {
// the third key is UUid, the value is command type
protected Map<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> phaseTwoResults =
new ConcurrentSkipListMap<>();
+ private long dummyDelayMS = 800;
@Test
@@ -94,6 +95,21 @@ public class TsFileSplitSenderTest extends TestBase {
splitIds.addAll(pieceNode.getAllTsFileData().stream().map(TsFileData::getSplitId).collect(
Collectors.toList()));
+ if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.isRelay) {
+ try {
+ Thread.sleep(dummyDelayMS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.isRelay) {
+ try {
+ Thread.sleep(dummyDelayMS / 2);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
// forward to other replicas in the group
if (req.isRelay) {
req.isRelay = false;