You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ka...@apache.org on 2021/01/06 02:20:40 UTC

[incubator-doris] branch master updated: [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)

This is an automated email from the ASF dual-hosted git repository.

kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1035e86  [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)
1035e86 is described below

commit 1035e86e0bc568b9c54e823919aa247baf40faae
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Wed Jan 6 10:20:16 2021 +0800

    [Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 14 ++++-
 .../java/org/apache/doris/qe/CoordinatorTest.java  | 65 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ff05413..f5dd310 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1560,14 +1560,22 @@ public class Coordinator {
                     break;
                 }
             }
-
-            buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
             Reference<Long> backendIdRef = new Reference<Long>();
             TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
             if (execHostPort == null) {
                 throw new UserException("there is no scanNode Backend");
             }
-
+            //the backend with buckendId is not alive, chose another new backend
+            if (backendIdRef.getRef() != buckendId) {
+                //buckendIdToBucketCountMap does not contain the new backend, insert into it
+                if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
+                    buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
+                } else { //buckendIdToBucketCountMap contains the new backend, update it 
+                    buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
+                }
+            } else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
+                buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
+            }
             addressToBackendID.put(execHostPort, backendIdRef.getRef());
             this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 35efae2..b56fc72 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator {
         }
         Assert.assertTrue(hosts.size() == 3);
     }
+
+    @Test
+    public void testBucketShuffleWithUnaliveBackend()  {
+        Coordinator coordinator = new Coordinator(context, analyzer, planner);
+        PlanFragmentId planFragmentId = new PlanFragmentId(1);
+        // each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
+        TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
+        TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
+        tScanRangeLocation0.backend_id = 0;
+        tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
+        TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
+        tScanRangeLocation1.backend_id = 1;
+        tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
+        TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
+        tScanRangeLocation2.backend_id = 2;
+        tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
+        tScanRangeLocations.locations = new ArrayList<>();
+        tScanRangeLocations.locations.add(tScanRangeLocation0);
+        tScanRangeLocations.locations.add(tScanRangeLocation1);
+        tScanRangeLocations.locations.add(tScanRangeLocation2);
+
+        // init all backend
+        Backend backend0 = new Backend(0, "0.0.0.0", 9060);
+        backend0.setAlive(false);
+        backend0.setBePort(9050);
+        Backend backend1 = new Backend(1, "0.0.0.1", 9060);
+        backend1.setAlive(true);
+        backend1.setBePort(9050);
+        Backend backend2 = new Backend(2, "0.0.0.2", 9060);
+        backend2.setAlive(true);
+        backend2.setBePort(9050);
+        Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
+        addressToBackendID.put(tScanRangeLocation0.server, tScanRangeLocation0.backend_id);
+        addressToBackendID.put(tScanRangeLocation1.server, tScanRangeLocation1.backend_id);
+        addressToBackendID.put(tScanRangeLocation2.server, tScanRangeLocation2.backend_id);
+
+        ImmutableMap<Long, Backend> idToBackend =
+                new ImmutableMap.Builder<Long, Backend>().
+                        put(0l, backend0).
+                        put(1l, backend1).
+                        put(2l, backend2).build();
+        Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
+        Map<Long, Integer> backendIdBucketCountMap = new HashMap<Long, Integer>();
+        fragmentIdToBuckendIdBucketCountMap.put(planFragmentId, backendIdBucketCountMap);
+        Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
+        BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds);
+        Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
+        fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, TNetworkAddress>());
+        Deencapsulation.setField(controller,  "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap);
+        Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap);
+        Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq",
+                tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID);
+        Assert.assertTrue(backendIdBucketCountMap.size() == 2);
+        List<Long> backendIds = new ArrayList<Long>();
+        List<Integer> counts = new ArrayList<Integer>();
+        for (Map.Entry<Long, Integer> item:backendIdBucketCountMap.entrySet()) {
+            backendIds.add(item.getKey());
+            counts.add(item.getValue());
+        }
+        Assert.assertTrue(backendIds.get(0) == 0);
+        Assert.assertTrue(counts.get(0) == 0);
+        Assert.assertTrue(backendIds.get(1) == 1);
+        Assert.assertTrue(counts.get(1) == 1);
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org