You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/01/15 03:19:15 UTC

[incubator-doris] branch master updated: [Optimize]Take all scan nodes of one sql into consideration when select host for a tablet (#4984)

This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d692764  [Optimize]Take all scan nodes of one sql into consideration when select host for a tablet (#4984)
d692764 is described below

commit d692764934bf3a073112a9180236d72e94ce181a
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Fri Jan 15 11:18:57 2021 +0800

    [Optimize]Take all scan nodes of one sql into consideration when select host for a tablet (#4984)
    
    Currently when a scan node scans many tablets, Doris will assure it load balance when choosing which replica for scan task to be executed. But it does not take other scan nodes into consideration to implement a global load balance. This patch tries to make all tables of all scan nodes to be load balance.
    
    Co-authored-by: wangxixu <wa...@xiaomi.com>
---
 .../main/java/org/apache/doris/qe/Coordinator.java |   8 +-
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 141 ++++++++++++++++++++-
 2 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f5dd310..558af53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1217,6 +1217,7 @@ public class Coordinator {
     // Populates scan_range_assignment_.
     // <fragment, <server, nodeId>>
     private void computeScanRangeAssignment() throws Exception {
+        HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
         // set scan ranges/locations for scan nodes
         for (ScanNode scanNode : scanNodes) {
             // the parameters of getScanRangeLocations may ignore, It dosn't take effect
@@ -1239,7 +1240,7 @@ public class Coordinator {
             } else if (bucketShuffleJoinController.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())) {
                 bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, idToBackend, addressToBackendID);
             } else {
-                computeScanRangeAssignmentByScheduler(scanNode, locations, assignment);
+                computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost);
             }
         }
     }
@@ -1311,9 +1312,8 @@ public class Coordinator {
     private void computeScanRangeAssignmentByScheduler(
             final ScanNode scanNode,
             final List<TScanRangeLocations> locations,
-            FragmentScanRangeAssignment assignment) throws Exception {
-
-        HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
+            FragmentScanRangeAssignment assignment,
+            HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws Exception {
         for (TScanRangeLocations scanRangeLocations : locations) {
             Reference<Long> backendIdRef = new Reference<Long>();
             TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost, backendIdRef);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index b56fc72..40b7568 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -38,6 +38,8 @@ import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ScanNode;
+
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -460,10 +462,11 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
         FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
         List<TScanRangeLocations> locations = new ArrayList<>();
+        HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
         locations.add(tScanRangeLocations);
         locations.add(tScanRangeLocations1);
         Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler",
-                olapScanNode, locations, assignment);
+                olapScanNode, locations, assignment, assignedBytesPerHost);
         for (Map.Entry entry:assignment.entrySet()) {
             Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, List<TScanRangeParams>>) entry.getValue();
             for (Map.Entry item:addr.entrySet()) {
@@ -492,7 +495,6 @@ public class CoordinatorTest extends Coordinator {
         tScanRangeLocations.locations.add(tScanRangeLocation0);
         tScanRangeLocations.locations.add(tScanRangeLocation1);
         tScanRangeLocations.locations.add(tScanRangeLocation2);
-
         // init all backend
         Backend backend0 = new Backend(0, "0.0.0.0", 9060);
         backend0.setAlive(true);
@@ -595,5 +597,140 @@ public class CoordinatorTest extends Coordinator {
         Assert.assertTrue(backendIds.get(1) == 1);
         Assert.assertTrue(counts.get(1) == 1);
     }
+
+    @Test
+    public void testComputeScanRangeAssignment()  {
+        Coordinator coordinator = new Coordinator(context, analyzer, planner);
+
+        //TScanRangeLocations
+        TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+        tScanRangeLocation0.backend_id = 0;
+        tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+        TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+        tScanRangeLocation1.backend_id = 1;
+        tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+        TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+        tScanRangeLocation2.backend_id = 2;
+        tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+        tScanRangeLocations.locations = new ArrayList<>();
+        tScanRangeLocations.locations.add(tScanRangeLocation0);
+        tScanRangeLocations.locations.add(tScanRangeLocation1);
+        tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+        //scanNode1
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        TupleDescriptor tupleDescriptor = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable = new OlapTable();
+        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable, "defaultDistributionInfo", hashDistributionInfo);
+        tupleDescriptor.setTable(olapTable);
+        OlapScanNode olapScanNode = new OlapScanNode(new PlanNodeId(1), tupleDescriptor, "test");
+        PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
+                new DataPartition(TPartitionType.UNPARTITIONED));
+        olapScanNode.setFragment(fragment);
+        List<TScanRangeLocations> locations = new ArrayList<>();
+        locations.add(tScanRangeLocations);
+        Deencapsulation.setField(olapScanNode, "result", locations);
+
+        //scanNode2
+        PlanFragmentId planFragmentId2 = new PlanFragmentId(2);
+        TupleDescriptor tupleDescriptor2 = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable2 = new OlapTable();
+        HashDistributionInfo hashDistributionInfo2 = new HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable2, "defaultDistributionInfo", hashDistributionInfo2);
+        tupleDescriptor2.setTable(olapTable2);
+        OlapScanNode olapScanNode2 = new OlapScanNode(new PlanNodeId(2), tupleDescriptor2, "test2");
+        PlanFragment fragment2 = new PlanFragment(planFragmentId2, olapScanNode2,
+                new DataPartition(TPartitionType.UNPARTITIONED));
+        olapScanNode2.setFragment(fragment2);
+        List<TScanRangeLocations> locations2 = new ArrayList<>();
+        locations2.add(tScanRangeLocations);
+        Deencapsulation.setField(olapScanNode2, "result", locations2);
+
+        //scanNode3
+        PlanFragmentId planFragmentId3 = new PlanFragmentId(3);
+        TupleDescriptor tupleDescriptor3 = new TupleDescriptor(new TupleId(-1));
+        OlapTable olapTable3 = new OlapTable();
+        HashDistributionInfo hashDistributionInfo3 = new HashDistributionInfo(66, new ArrayList<>());
+        Deencapsulation.setField(olapTable3, "defaultDistributionInfo", hashDistributionInfo3);
+        tupleDescriptor3.setTable(olapTable3);
+        OlapScanNode olapScanNode3 = new OlapScanNode(new PlanNodeId(3), tupleDescriptor3, "test3");
+        PlanFragment fragment3 = new PlanFragment(planFragmentId3, olapScanNode3,
+                new DataPartition(TPartitionType.UNPARTITIONED));
+        olapScanNode3.setFragment(fragment3);
+        List<TScanRangeLocations> locations3 = new ArrayList<>();
+        locations3.add(tScanRangeLocations);
+        Deencapsulation.setField(olapScanNode3, "result", locations3);
+
+        //scan nodes
+        List<ScanNode> scanNodes = new ArrayList<>();
+        scanNodes.add(olapScanNode);
+        scanNodes.add(olapScanNode2);
+        scanNodes.add(olapScanNode3);
+        Deencapsulation.setField(coordinator, "scanNodes", scanNodes);
+
+        //fragmentIdToScanNodeIds
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = Maps.newHashMap();
+        Set<Integer> ids1 = new HashSet<>();
+        ids1.add(1);
+        fragmentIdToScanNodeIds.put(planFragmentId, ids1);
+        Set<Integer> ids2 = new HashSet<>();
+        ids1.add(2);
+        fragmentIdToScanNodeIds.put(planFragmentId, ids2);
+        Set<Integer> ids3 = new HashSet<>();
+        ids1.add(3);
+        fragmentIdToScanNodeIds.put(planFragmentId, ids3);
+        Deencapsulation.setField(coordinator,"fragmentIdToScanNodeIds", fragmentIdToScanNodeIds);
+
+        //fragmentExecParamsMap
+        Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap();
+        fragmentExecParamsMap.put(planFragmentId, new FragmentExecParams(fragment));
+        fragmentExecParamsMap.put(planFragmentId2, new FragmentExecParams(fragment2));
+        fragmentExecParamsMap.put(planFragmentId3, new FragmentExecParams(fragment3));
+        Deencapsulation.setField(coordinator,"fragmentExecParamsMap", fragmentExecParamsMap);
+
+        //bucketShuffleJoinController
+        BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController(fragmentIdToScanNodeIds);
+        // init all backend
+        Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+        backend0.setAlive(true);
+        backend0.setBePort(9050);
+        Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+        backend1.setAlive(true);
+        backend1.setBePort(9050);
+        Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+        backend2.setAlive(true);
+        backend2.setBePort(9050);
+
+        ImmutableMap<Long, Backend> idToBackend =
+                new ImmutableMap.Builder<Long, Backend>().
+                        put(0l, backend0).
+                        put(1l, backend1).
+                        put(2l, backend2).build();
+        Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
+
+        Deencapsulation.invoke(coordinator, "computeScanRangeAssignment");
+        FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment;
+        Assert.assertTrue(assignment.size() == 1);
+        for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment.entrySet()) {
+            TNetworkAddress host = entry.getKey();
+            Assert.assertTrue(host.hostname.equals("0.0.0.0"));
+        }
+
+        FragmentScanRangeAssignment assignment2 = fragmentExecParamsMap.get(fragment2.getFragmentId()).scanRangeAssignment;
+        Assert.assertTrue(assignment2.size() == 1);
+        for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment2.entrySet()) {
+            TNetworkAddress host = entry.getKey();
+            Assert.assertTrue(host.hostname.equals("0.0.0.1"));
+        }
+
+        FragmentScanRangeAssignment assignment3 = fragmentExecParamsMap.get(fragment3.getFragmentId()).scanRangeAssignment;
+        Assert.assertTrue(assignment3.size() == 1);
+        for (Map.Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : assignment3.entrySet()) {
+            TNetworkAddress host = entry.getKey();
+            Assert.assertTrue(host.hostname.equals("0.0.0.2"));
+        }
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org