You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/11 13:44:56 UTC

[incubator-doris] branch master updated: [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f575e3e7c [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)
3f575e3e7c is described below

commit 3f575e3e7c9e4d9565240a410890d552e567d870
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Sat Jun 11 21:44:47 2022 +0800

    [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)
    
    When plan bucket shuffle join, we need to know left table bucket number.
    Currently, we use tablet number directly based on the assumption that left table has only one partition.
    But, when left table is colocated table, it could have more than one partition.
    In this case, some data in right table will be dropped incorrectly and produce wrong result for query.
    
    reproduce could follow regression test in PR.
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 16 ++++-
 .../test_bucket_join_with_colocate_table.out       |  7 ++
 .../test_bucket_join_with_colocate_table.groovy    | 80 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8f95c04890..555b68aefa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1850,9 +1850,19 @@ public class Coordinator {
         private void computeScanRangeAssignmentByBucket(
                 final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
             if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
-                // The bucket shuffle join only hit when the partition is one. so the totalTabletsNum is all tablet of
-                // one hit partition. can be the right bucket num in bucket shuffle join
-                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), (int) scanNode.getTotalTabletsNum());
+                // In bucket shuffle join, we have 2 situation.
+                // 1. Only one partition: in this case, we use scanNode.getTotalTabletsNum() to get the right bucket num
+                //    because when table turn on dynamic partition, the bucket number in default distribution info
+                //    is not correct.
+                // 2. Table is colocated: in this case, table could have more than one partition, but all partition's
+                //    bucket number must be same, so we use default bucket num is ok.
+                int bucketNum = 0;
+                if (scanNode.getOlapTable().isColocateTable()) {
+                    bucketNum = scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum();
+                } else {
+                    bucketNum = (int) (scanNode.getTotalTabletsNum());
+                }
+                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum);
                 fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
                 fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
                 fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
diff --git a/regression-test/data/correctness/test_bucket_join_with_colocate_table.out b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
new file mode 100644
index 0000000000..0cf6530614
--- /dev/null
+++ b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select --
+2	2	2	2	2
+1	1	1	1	1
+3	3	3	3	3
+\N	\N	\N	4	4
+
diff --git a/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
new file mode 100644
index 0000000000..4c8f0d0f18
--- /dev/null
+++ b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+ // or more contributor license agreements.  See the NOTICE file
+ // distributed with this work for additional information
+ // regarding copyright ownership.  The ASF licenses this file
+ // to you under the Apache License, Version 2.0 (the
+ // "License"); you may not use this file except in compliance
+ // with the License.  You may obtain a copy of the License at
+ //
+ //   http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing,
+ // software distributed under the License is distributed on an
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // KIND, either express or implied.  See the License for the
+ // specific language governing permissions and limitations
+ // under the License.
+
+ suite("test_bucket_join_with_colocate_table") {
+     def colocateTableName = "colocate_table"
+     def rightTable = "right_table"
+
+
+     sql """ DROP TABLE IF EXISTS ${colocateTableName} """
+     sql """ DROP TABLE IF EXISTS ${rightTable} """
+     sql """
+         CREATE TABLE `${colocateTableName}` (
+           `c1` int(11) NULL COMMENT "",
+           `c2` int(11) NULL COMMENT "",
+           `c3` int(11) NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`c1`, `c2`, `c3`)
+         COMMENT "OLAP"
+         PARTITION BY RANGE(`c2`)
+         (PARTITION p1 VALUES [("-2147483648"), ("2")),
+         PARTITION p2 VALUES [("2"), (MAXVALUE)))
+         DISTRIBUTED BY HASH(`c1`) BUCKETS 8
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "colocate_with" = "group1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+     sql """
+         CREATE TABLE `${rightTable}` (
+           `k1` int(11) NOT NULL COMMENT "",
+           `v1` int(11) NOT NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`k1`, `v1`)
+         COMMENT "OLAP"
+         DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+
+     sql """ INSERT INTO ${colocateTableName} VALUES
+         (0, 0, 0),
+         (1, 1, 1),
+         (2, 2, 2),
+         (3, 3, 3)
+         ;
+     """
+
+     sql """ INSERT INTO ${rightTable} VALUES
+         (1, 1),
+         (2, 2),
+         (3, 3),
+         (4, 4)
+         ;
+     """
+
+     // test_vectorized
+     sql """ set enable_vectorized_engine = true; """
+
+     qt_select """  select * from ${colocateTableName} right outer join ${rightTable} on ${colocateTableName}.c1 = ${rightTable}.k1; """
+ }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org