You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ka...@apache.org on 2021/01/06 02:20:40 UTC
[incubator-doris] branch master updated: [Enhancement] Optimize the
algorithm of selecting host for a bucket scan task when a backend not alive
(#5133)
This is an automated email from the ASF dual-hosted git repository.
kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1035e86 [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)
1035e86 is described below
commit 1035e86e0bc568b9c54e823919aa247baf40faae
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Wed Jan 6 10:20:16 2021 +0800
[Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)
---
.../main/java/org/apache/doris/qe/Coordinator.java | 14 ++++-
.../java/org/apache/doris/qe/CoordinatorTest.java | 65 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ff05413..f5dd310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1560,14 +1560,22 @@ public class Coordinator {
break;
}
}
-
- buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
if (execHostPort == null) {
throw new UserException("there is no scanNode Backend");
}
-
+ //the backend with buckendId is not alive, chose another new backend
+ if (backendIdRef.getRef() != buckendId) {
+ //buckendIdToBucketCountMap does not contain the new backend, insert into it
+ if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
+ buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
+ } else { //buckendIdToBucketCountMap contains the new backend, update it
+ buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
+ }
+ } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
+ buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
+ }
addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 35efae2..b56fc72 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
@@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator {
}
Assert.assertTrue(hosts.size() == 3);
}
+
+ @Test
+ public void testBucketShuffleWithUnaliveBackend() {
+ Coordinator coordinator = new Coordinator(context, analyzer, planner);
+ PlanFragmentId planFragmentId = new PlanFragmentId(1);
+ // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+ TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+ TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+ tScanRangeLocation0.backend_id = 0;
+ tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+ TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+ tScanRangeLocation1.backend_id = 1;
+ tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+ TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+ tScanRangeLocation2.backend_id = 2;
+ tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+ tScanRangeLocations.locations = new ArrayList<>();
+ tScanRangeLocations.locations.add(tScanRangeLocation0);
+ tScanRangeLocations.locations.add(tScanRangeLocation1);
+ tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+ // init all backend
+ Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+ backend0.setAlive(false);
+ backend0.setBePort(9050);
+ Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+ backend1.setAlive(true);
+ backend1.setBePort(9050);
+ Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+ backend2.setAlive(true);
+ backend2.setBePort(9050);
+ Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
+ addressToBackendID.put(tScanRangeLocation0.server, tScanRangeLocation0.backend_id);
+ addressToBackendID.put(tScanRangeLocation1.server, tScanRangeLocation1.backend_id);
+ addressToBackendID.put(tScanRangeLocation2.server, tScanRangeLocation2.backend_id);
+
+ ImmutableMap<Long, Backend> idToBackend =
+ new ImmutableMap.Builder<Long, Backend>().
+ put(0l, backend0).
+ put(1l, backend1).
+ put(2l, backend2).build();
+ Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
+ Map<Long, Integer> backendIdBucketCountMap = new HashMap<Long, Integer>();
+ fragmentIdToBuckendIdBucketCountMap.put(planFragmentId, backendIdBucketCountMap);
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
+ BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds);
+ Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
+ fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, TNetworkAddress>());
+ Deencapsulation.setField(controller, "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap);
+ Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap);
+ Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq",
+ tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID);
+ Assert.assertTrue(backendIdBucketCountMap.size() == 2);
+ List<Long> backendIds = new ArrayList<Long>();
+ List<Integer> counts = new ArrayList<Integer>();
+ for (Map.Entry<Long, Integer> item:backendIdBucketCountMap.entrySet()) {
+ backendIds.add(item.getKey());
+ counts.add(item.getValue());
+ }
+ Assert.assertTrue(backendIds.get(0) == 0);
+ Assert.assertTrue(counts.get(0) == 0);
+ Assert.assertTrue(backendIds.get(1) == 1);
+ Assert.assertTrue(counts.get(1) == 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org