You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/22 07:53:36 UTC

[incubator-doris] branch dev-1.0.1 updated (1c5f7dade1 -> 19c9f70de2)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from 1c5f7dade1 [fix] fix bug when partition_id exceeds integer range in spark load (#9073)
     new 992e824a09 [Fix bug] fix non-equal out join is not supported (#8857)
     new 6160303272 [fix](partition) Fix wrong partition distribution key info for random hash olap table (#9104)
     new c9875cb3f1 [fix](dynamic_partition) fix dynamic partition scheduler not work for olap table with random hash info (#9108)
     new 5d5928e474 [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112)
     new 4e7aca29bd [fix](ut)(vectorized) fix a potential stack overflow bug and some unit test (#9140)
     new 19c9f70de2 [revert] "[Fix bug] fix non-equal out join is not supported (#8857)" (#9150)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  60 ++------
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  11 +-
 be/src/vec/core/block.cpp                          |   9 ++
 be/src/vec/core/block.h                            |  33 ++--
 be/src/vec/exec/volap_scan_node.cpp                |  51 +++----
 be/src/vec/exec/volap_scan_node.h                  |   3 +
 be/src/vec/io/var_int.h                            |   2 +-
 be/test/runtime/CMakeLists.txt                     |   1 -
 be/test/runtime/free_list_test.cpp                 | 166 ---------------------
 .../doris/clone/DynamicPartitionScheduler.java     |  20 ++-
 .../doris/common/proc/PartitionsProcDir.java       |   2 +-
 .../apache/doris/common/util/VectorizedUtil.java   |   7 +-
 .../java/org/apache/doris/utframe/DorisAssert.java |   2 +
 13 files changed, 99 insertions(+), 268 deletions(-)
 delete mode 100644 be/test/runtime/free_list_test.cpp


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/06: [fix](ut)(vectorized) fix a potential stack overflow bug and some unit test (#9140)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 4e7aca29bdd7eb4488366104bc43749edf6e519f
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Apr 21 12:17:03 2022 +0800

    [fix](ut)(vectorized) fix a potential stack overflow bug and some unit test (#9140)
---
 be/src/vec/io/var_int.h                            |   2 +-
 be/test/runtime/CMakeLists.txt                     |   1 -
 be/test/runtime/free_list_test.cpp                 | 166 ---------------------
 .../apache/doris/common/util/VectorizedUtil.java   |   7 +-
 .../java/org/apache/doris/utframe/DorisAssert.java |   2 +
 5 files changed, 9 insertions(+), 169 deletions(-)

diff --git a/be/src/vec/io/var_int.h b/be/src/vec/io/var_int.h
index 45577a3bf3..432ae65402 100644
--- a/be/src/vec/io/var_int.h
+++ b/be/src/vec/io/var_int.h
@@ -125,7 +125,7 @@ inline void read_var_uint(UInt64& x, BufferReadable& buf) {
 }
 
 inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
-    char bytes[8];
+    char bytes[9];
     uint8_t i = 0;
     while (i < 9) {
         uint8_t byte = x & 0x7F;
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index 28eff33cac..6cced0d943 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -24,7 +24,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/runtime")
 #ADD_BE_TEST(result_buffer_mgr_test)
 #ADD_BE_TEST(result_sink_test)
 ADD_BE_TEST(mem_pool_test)
-ADD_BE_TEST(free_list_test)
 ADD_BE_TEST(string_buffer_test)
 #ADD_BE_TEST(data_stream_test)
 #ADD_BE_TEST(parallel_executor_test)
diff --git a/be/test/runtime/free_list_test.cpp b/be/test/runtime/free_list_test.cpp
deleted file mode 100644
index e67aa533e6..0000000000
--- a/be/test/runtime/free_list_test.cpp
+++ /dev/null
@@ -1,166 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/free_list.hpp"
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
-
-namespace doris {
-
-TEST(FreeListTest, Basic) {
-    MemTracker tracker;
-    MemPool pool(&tracker);
-    FreeList list;
-
-    int allocated_size;
-    uint8_t* free_list_mem = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    uint8_t* mem = pool.allocate(FreeList::min_size());
-    EXPECT_TRUE(mem != nullptr);
-
-    list.add(mem, FreeList::min_size());
-    free_list_mem = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_EQ(mem, free_list_mem);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-
-    free_list_mem = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    // Make 3 allocations and add them to the free list.
-    // Get them all back from the free list, scribbling to the
-    // returned memory in between.
-    // Attempt a 4th allocation from the free list and make sure
-    // we get nullptr.
-    // Repeat with the same memory blocks.
-    uint8_t* free_list_mem1 = nullptr;
-    uint8_t* free_list_mem2 = nullptr;
-    uint8_t* free_list_mem3 = nullptr;
-
-    mem = pool.allocate(FreeList::min_size());
-    list.add(mem, FreeList::min_size());
-    mem = pool.allocate(FreeList::min_size());
-    list.add(mem, FreeList::min_size());
-    mem = pool.allocate(FreeList::min_size());
-    list.add(mem, FreeList::min_size());
-
-    free_list_mem1 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem1 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem1, FreeList::min_size());
-
-    free_list_mem2 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem2 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem2, FreeList::min_size());
-
-    free_list_mem3 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem3 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem3, FreeList::min_size());
-
-    free_list_mem = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    list.add(free_list_mem1, FreeList::min_size());
-    list.add(free_list_mem2, FreeList::min_size());
-    list.add(free_list_mem3, FreeList::min_size());
-
-    free_list_mem1 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem1 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem1, FreeList::min_size());
-
-    free_list_mem2 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem2 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem2, FreeList::min_size());
-
-    free_list_mem3 = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_TRUE(free_list_mem3 != nullptr);
-    EXPECT_EQ(allocated_size, FreeList::min_size());
-    bzero(free_list_mem3, FreeList::min_size());
-
-    free_list_mem = list.allocate(FreeList::min_size(), &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    // Try some allocations with different sizes
-    int size1 = FreeList::min_size();
-    int size2 = FreeList::min_size() * 2;
-    int size4 = FreeList::min_size() * 4;
-
-    uint8_t* mem1 = pool.allocate(size1);
-    uint8_t* mem2 = pool.allocate(size2);
-    uint8_t* mem4 = pool.allocate(size4);
-
-    list.add(mem2, size2);
-    free_list_mem = list.allocate(size4, &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    free_list_mem = list.allocate(size1, &allocated_size);
-    EXPECT_TRUE(free_list_mem != nullptr);
-    EXPECT_EQ(allocated_size, size2);
-    bzero(free_list_mem, size1);
-
-    free_list_mem = list.allocate(size1, &allocated_size);
-    EXPECT_EQ(nullptr, free_list_mem);
-    EXPECT_EQ(allocated_size, 0);
-
-    list.add(mem2, size2);
-    list.add(mem4, size4);
-    list.add(mem1, size1);
-
-    free_list_mem = list.allocate(size4, &allocated_size);
-    EXPECT_EQ(mem4, free_list_mem);
-    EXPECT_EQ(allocated_size, size4);
-    bzero(free_list_mem, size4);
-
-    free_list_mem = list.allocate(size2, &allocated_size);
-    EXPECT_EQ(mem2, free_list_mem);
-    EXPECT_EQ(allocated_size, size2);
-    bzero(free_list_mem, size2);
-
-    free_list_mem = list.allocate(size1, &allocated_size);
-    EXPECT_EQ(mem1, free_list_mem);
-    EXPECT_EQ(allocated_size, size1);
-    bzero(free_list_mem, size1);
-}
-
-} // namespace doris
-
-int main(int argc, char** argv) {
-#if 0
-    std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
-    if (!doris::config::init(conffile.c_str(), false)) {
-        fprintf(stderr, "error read config file. \n");
-        return -1;
-    }
-    init_glog("be-test");
-#endif
-    ::testing::InitGoogleTest(&argc, argv);
-    return RUN_ALL_TESTS();
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index b094389db9..4b793a0e9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.util;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
 
 public class VectorizedUtil {
     /**
@@ -32,7 +33,11 @@ public class VectorizedUtil {
         if (connectContext == null) {
             return false;
         }
-        Analyzer analyzer = connectContext.getExecutor().getAnalyzer();
+        StmtExecutor stmtExecutor = connectContext.getExecutor();
+        if (stmtExecutor == null) {
+            return connectContext.getSessionVariable().enableVectorizedEngine();
+        }
+        Analyzer analyzer = stmtExecutor.getAnalyzer();
         if (analyzer == null) {
             return connectContext.getSessionVariable().enableVectorizedEngine();
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java
index f6d763433e..eabaab95c8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DorisAssert.java
@@ -191,6 +191,8 @@ public class DorisAssert {
 
         private String internalExecute(String sql) throws Exception {
             StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
+            connectContext.setExecutor(stmtExecutor);
+            ConnectContext.get().setExecutor(stmtExecutor);
             stmtExecutor.execute();
             QueryState queryState = connectContext.getState();
             if (queryState.getStateType() == QueryState.MysqlStateType.ERR) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/06: [Fix bug] fix non-equal out join is not supported (#8857)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 992e824a098babca8d838a6ad858b552491152ad
Author: shee <13...@users.noreply.github.com>
AuthorDate: Wed Apr 20 21:44:20 2022 -0700

    [Fix bug] fix non-equal out join is not supported (#8857)
---
 .../java/org/apache/doris/analysis/TableRef.java   | 15 ++++++++-
 .../org/apache/doris/planner/QueryPlanTest.java    | 37 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 7e9a509002..4aa0a1dbc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -215,6 +215,14 @@ public class TableRef implements ParseNode, Writable {
         return null;
     }
 
+    public boolean isOuterJoin() {
+        if (joinOp != null) {
+            return joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.RIGHT_OUTER_JOIN
+                    || joinOp == JoinOperator.FULL_OUTER_JOIN;
+        }
+        return false;
+    }
+
     public JoinOperator getJoinOp() {
         // if it's not explicitly set, we're doing an inner join
         return (joinOp == null ? JoinOperator.INNER_JOIN : joinOp);
@@ -572,7 +580,12 @@ public class TableRef implements ParseNode, Writable {
     public void rewriteExprs(ExprRewriter rewriter, Analyzer analyzer)
             throws AnalysisException {
         Preconditions.checkState(isAnalyzed);
-        if (onClause != null) onClause = rewriter.rewrite(onClause, analyzer, ExprRewriter.ClauseType.ON_CLAUSE);
+        if (onClause != null) {
+            if (isOuterJoin()) {
+                return;
+            }
+            onClause = rewriter.rewrite(onClause, analyzer, ExprRewriter.ClauseType.ON_CLAUSE);
+        }
     }
 
     private String joinOpToSql() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 20a5c7c704..e535e6c0ec 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -2119,4 +2119,41 @@ public class QueryPlanTest {
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
         Assert.assertTrue(explainString.contains("1 | 10 | 1 | 1 | 1"));
     }
+
+    /**
+     * for issue #8856
+    */
+    @Test
+    public void testOutJoinWithOnFalse() throws Exception {
+        connectContext.setDatabase("default_cluster:test");
+        createTable("create table out_join_1\n" +
+                "(\n" +
+                "    k1 int,\n" +
+                "    v int\n" +
+                ")\n" +
+                "DISTRIBUTED BY HASH(k1) BUCKETS 10\n" +
+                "PROPERTIES(\"replication_num\" = \"1\");");
+
+        createTable("create table out_join_2\n" +
+                "(\n" +
+                "    k1 int,\n" +
+                "    v int\n" +
+                ")\n" +
+                "DISTRIBUTED BY HASH(k1) BUCKETS 10\n" +
+                "PROPERTIES(\"replication_num\" = \"1\");");
+
+        String sql = "explain select * from out_join_1 left join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
+        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
+        Assert.assertFalse(explainString.contains("non-equal LEFT OUTER JOIN is not supported"));
+
+        sql = "explain select * from out_join_1 right join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
+        Assert.assertFalse(explainString.contains("non-equal RIGHT OUTER JOIN is not supported"));
+
+        sql = "explain select * from out_join_1 full join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
+        Assert.assertFalse(explainString.contains("non-equal FULL OUTER JOIN is not supported"));
+
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/06: [fix](partition) Fix wrong partition distribution key info for random hash olap table (#9104)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 616030327296bf889263e05e3ada5f9bc74d32cf
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Wed Apr 20 17:08:42 2022 +0800

    [fix](partition) Fix wrong partition distribution key info for random hash olap table (#9104)
---
 .../src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 661eaaddc7..f30432756e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -259,7 +259,7 @@ public class PartitionsProcDir implements ProcDirInterface {
                     }
                     partitionInfo.add(sb.toString());
                 } else {
-                    partitionInfo.add("ALL KEY");
+                    partitionInfo.add("RANDOM");
                 }
 
                 partitionInfo.add(distributionInfo.getBucketNum());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/06: [fix](dynamic_partition) fix dynamic partition scheduler not work for olap table with random hash info (#9108)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit c9875cb3f1aea3957fa201d0d72efe0ce8ed4eaf
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Apr 21 12:16:27 2022 +0800

    [fix](dynamic_partition) fix dynamic partition scheduler not work for olap table with random hash info (#9108)
---
 .../doris/clone/DynamicPartitionScheduler.java       | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index e85b620d01..319e4dcb88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -23,10 +23,12 @@ import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.analysis.PartitionKeyDesc;
 import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.RandomDistributionDesc;
 import org.apache.doris.analysis.SinglePartitionDesc;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DynamicPartitionProperty;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.OlapTable;
@@ -220,14 +222,18 @@ public class DynamicPartitionScheduler extends MasterDaemon {
             SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName,
                     partitionKeyDesc, partitionProperties);
 
-            // construct distribution desc
-            HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) olapTable.getDefaultDistributionInfo();
-            List<String> distColumnNames = new ArrayList<>();
-            for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
-                distColumnNames.add(distributionColumn.getName());
+            DistributionDesc distributionDesc = null;
+            DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
+            if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
+                HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
+                List<String> distColumnNames = new ArrayList<>();
+                for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
+                    distColumnNames.add(distributionColumn.getName());
+                }
+                distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
+            } else {
+                distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
             }
-            DistributionDesc distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
-
             // add partition according to partition desc and distribution desc
             addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/06: [revert] "[Fix bug] fix non-equal out join is not supported (#8857)" (#9150)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 19c9f70de2f0442febd5da989f9260ca5c01e0f8
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Apr 21 18:20:19 2022 +0800

    [revert] "[Fix bug] fix non-equal out join is not supported (#8857)" (#9150)
    
    This PR cause FE ut failed:
    
    InferFiltersRuleTest
    testOn3Tables1stInner2ndRightJoinEqLiteralAt2nd
    testOn3Tables1stInner2ndRightJoinEqLiteralAt3rd
---
 .../java/org/apache/doris/analysis/TableRef.java   | 15 +--------
 .../org/apache/doris/planner/QueryPlanTest.java    | 37 ----------------------
 2 files changed, 1 insertion(+), 51 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 4aa0a1dbc0..7e9a509002 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -215,14 +215,6 @@ public class TableRef implements ParseNode, Writable {
         return null;
     }
 
-    public boolean isOuterJoin() {
-        if (joinOp != null) {
-            return joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.RIGHT_OUTER_JOIN
-                    || joinOp == JoinOperator.FULL_OUTER_JOIN;
-        }
-        return false;
-    }
-
     public JoinOperator getJoinOp() {
         // if it's not explicitly set, we're doing an inner join
         return (joinOp == null ? JoinOperator.INNER_JOIN : joinOp);
@@ -580,12 +572,7 @@ public class TableRef implements ParseNode, Writable {
     public void rewriteExprs(ExprRewriter rewriter, Analyzer analyzer)
             throws AnalysisException {
         Preconditions.checkState(isAnalyzed);
-        if (onClause != null) {
-            if (isOuterJoin()) {
-                return;
-            }
-            onClause = rewriter.rewrite(onClause, analyzer, ExprRewriter.ClauseType.ON_CLAUSE);
-        }
+        if (onClause != null) onClause = rewriter.rewrite(onClause, analyzer, ExprRewriter.ClauseType.ON_CLAUSE);
     }
 
     private String joinOpToSql() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index e535e6c0ec..20a5c7c704 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -2119,41 +2119,4 @@ public class QueryPlanTest {
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
         Assert.assertTrue(explainString.contains("1 | 10 | 1 | 1 | 1"));
     }
-
-    /**
-     * for issue #8856
-    */
-    @Test
-    public void testOutJoinWithOnFalse() throws Exception {
-        connectContext.setDatabase("default_cluster:test");
-        createTable("create table out_join_1\n" +
-                "(\n" +
-                "    k1 int,\n" +
-                "    v int\n" +
-                ")\n" +
-                "DISTRIBUTED BY HASH(k1) BUCKETS 10\n" +
-                "PROPERTIES(\"replication_num\" = \"1\");");
-
-        createTable("create table out_join_2\n" +
-                "(\n" +
-                "    k1 int,\n" +
-                "    v int\n" +
-                ")\n" +
-                "DISTRIBUTED BY HASH(k1) BUCKETS 10\n" +
-                "PROPERTIES(\"replication_num\" = \"1\");");
-
-        String sql = "explain select * from out_join_1 left join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
-        String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
-        Assert.assertFalse(explainString.contains("non-equal LEFT OUTER JOIN is not supported"));
-
-        sql = "explain select * from out_join_1 right join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
-        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
-        Assert.assertFalse(explainString.contains("non-equal RIGHT OUTER JOIN is not supported"));
-
-        sql = "explain select * from out_join_1 full join out_join_2 on out_join_1.k1 = out_join_2.k1 and 1=2;";
-        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql);
-        Assert.assertFalse(explainString.contains("non-equal FULL OUTER JOIN is not supported"));
-
-    }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/06: [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5d5928e4744cd65bb95dd4fd44a9f8e74953f43e
Author: Pxl <95...@qq.com>
AuthorDate: Thu Apr 21 11:02:04 2022 +0800

    [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112)
---
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 60 ++++++----------------
 be/src/olap/rowset/segment_v2/segment_iterator.h   | 11 ++--
 be/src/vec/core/block.cpp                          |  9 ++++
 be/src/vec/core/block.h                            | 33 ++++++------
 be/src/vec/exec/volap_scan_node.cpp                | 51 +++++++++---------
 be/src/vec/exec/volap_scan_node.h                  |  3 ++
 6 files changed, 76 insertions(+), 91 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 58774e2352..52cc584fdb 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -668,8 +668,10 @@ void SegmentIterator::_vec_init_lazy_materialization() {
                     // todo(wb) make a cost-based lazy-materialization framework
                     // check non-pred column type to decide whether using lazy-materialization
                     FieldType type = _schema.column(cid)->type();
-                    if (_is_all_column_basic_type && (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT 
-                            || type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_STRING)) {
+                    if (_is_all_column_basic_type &&
+                        (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+                         type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+                         type == OLAP_FIELD_TYPE_STRING)) {
                         _is_all_column_basic_type = false;
                     }
                 }
@@ -753,23 +755,7 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
 
 void SegmentIterator::_init_current_block(
         vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) {
-    bool is_block_mem_reuse = block->mem_reuse();
-    if (is_block_mem_reuse) {
-        block->clear_column_data(_schema.num_column_ids());
-    } else { // pre fill output block here
-        for (size_t i = 0; i < _schema.num_column_ids(); i++) {
-            auto cid = _schema.column_id(i);
-            auto column_desc = _schema.column(cid);
-            auto data_type = Schema::get_data_type_ptr(column_desc->type());
-            if (column_desc->is_nullable()) {
-                block->insert({nullptr,
-                               std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)),
-                               column_desc->name()});
-            } else {
-                block->insert({nullptr, std::move(data_type), column_desc->name()});
-            }
-        }
-    }
+    block->clear_column_data(_schema.num_column_ids());
 
     for (size_t i = 0; i < _schema.num_column_ids(); i++) {
         auto cid = _schema.column_id(i);
@@ -778,17 +764,8 @@ void SegmentIterator::_init_current_block(
         if (_is_pred_column[cid]) { //todo(wb) maybe we can release it after output block
             current_columns[cid]->clear();
         } else { // non-predicate column
-            if (is_block_mem_reuse) {
-                current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
-            } else {
-                auto data_type = Schema::get_data_type_ptr(column_desc->type());
-                if (column_desc->is_nullable()) {
-                    current_columns[cid] = doris::vectorized::ColumnNullable::create(
-                            data_type->create_column(), doris::vectorized::ColumnUInt8::create());
-                } else {
-                    current_columns[cid] = data_type->create_column();
-                }
-            }
+            current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
+
             if (column_desc->type() == OLAP_FIELD_TYPE_DATE) {
                 current_columns[cid]->set_date_type();
             } else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) {
@@ -799,7 +776,7 @@ void SegmentIterator::_init_current_block(
     }
 }
 
-void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse) {
+void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     for (auto cid : _non_predicate_columns) {
         block->replace_by_position(_schema_block_id_map[cid],
                                    std::move(_current_return_columns[cid]));
@@ -919,6 +896,8 @@ void SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column
 
 Status SegmentIterator::next_batch(vectorized::Block* block) {
     bool is_mem_reuse = block->mem_reuse();
+    DCHECK(is_mem_reuse);
+
     SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
     if (UNLIKELY(!_inited)) {
         RETURN_IF_ERROR(_init(true));
@@ -953,24 +932,15 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
             // todo(wb) abstract make column where
             if (!_is_pred_column[cid]) { // non-predicate
                 block->replace_by_position(i, std::move(_current_return_columns[cid]));
-            } else { // predicate
-                if (!is_mem_reuse) {
-                    auto column_desc = _schema.column(cid);
-                    auto data_type = Schema::get_data_type_ptr(column_desc->type());
-                    block->replace_by_position(i, data_type->create_column());
-                }
             }
         }
-        // not sure whether block is clear before enter segmentIter, so clear it here.
-        if (is_mem_reuse) {
-            block->clear_column_data();
-        }
+        block->clear_column_data();
         return Status::EndOfFile("no more data in segment");
     }
 
     // when no predicate(include delete condition) is provided, output column directly
     if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) {
-        _output_non_pred_columns(block, is_mem_reuse);
+        _output_non_pred_columns(block);
     } else { // need predicate evaluation
         uint16_t selected_size = nrows_read;
         uint16_t sel_rowid_idx[selected_size];
@@ -982,7 +952,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
         // So output block directly after vectorization evaluation
         if (_is_all_column_basic_type) {
             RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx,
-                                                      selected_size, is_mem_reuse));
+                                                      selected_size));
         } else {
             // step 2: evaluate short ciruit predicate
             // todo(wb) research whether need to read short predicate after vectorization evaluation
@@ -998,7 +968,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
 
             // step4: output columns
             // 4.1 output non-predicate column
-            _output_non_pred_columns(block, is_mem_reuse);
+            _output_non_pred_columns(block);
 
             // 4.2 get union of short_cir_pred and vec_pred
             std::set<ColumnId> pred_column_ids;
@@ -1008,7 +978,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
 
             // 4.3 output short circuit and predicate column
             RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids, sel_rowid_idx,
-                                                      selected_size, is_mem_reuse));
+                                                      selected_size));
         }
     }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 7f2d11e0b6..fb461fe54d 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -102,20 +102,19 @@ private:
                              std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
     void _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t& selected_size);
     void _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t* selected_size);
-    void _output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse);
+    void _output_non_pred_columns(vectorized::Block* block);
     void _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
                                  std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx,
                                  size_t select_size, vectorized::MutableColumns* mutable_columns);
 
     template <class Container>
     Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids,
-                                     uint16_t* sel_rowid_idx, uint16_t select_size,
-                                     bool is_block_mem_reuse) {
+                                     uint16_t* sel_rowid_idx, uint16_t select_size) {
         for (auto cid : column_ids) {
             int block_cid = _schema_block_id_map[cid];
-            RETURN_IF_ERROR(block->copy_column_data_to_block(
-                    is_block_mem_reuse, _current_return_columns[cid].get(), sel_rowid_idx,
-                    select_size, block_cid, _opts.block_row_max));
+            RETURN_IF_ERROR(block->copy_column_data_to_block(_current_return_columns[cid].get(),
+                                                             sel_rowid_idx, select_size, block_cid,
+                                                             _opts.block_row_max));
         }
         return Status::OK();
     }
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 33c73205a5..0d275f6604 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -137,6 +137,15 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data {data_} {
     initialize_index_by_name();
 }
 
+Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) {
+    for (const auto slot_desc : slots) {
+        auto column_ptr = slot_desc->get_empty_mutable_column();
+        column_ptr->reserve(block_size);
+        insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(),
+                                     slot_desc->col_name()));
+    }
+}
+
 Block::Block(const PBlock& pblock) {
     const char* buf = nullptr;
     std::string compression_scratch;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 6ef105cf3b..5b543dd8ff 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -28,6 +28,8 @@
 #include <parallel_hashmap/phmap.h>
 
 #include "gen_cpp/data.pb.h"
+#include "runtime/descriptors.h"
+#include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block_info.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -67,6 +69,7 @@ public:
     Block(std::initializer_list<ColumnWithTypeAndName> il);
     Block(const ColumnsWithTypeAndName& data_);
     Block(const PBlock& pblock);
+    Block(const std::vector<SlotDescriptor*>& slots, size_t block_size);
 
     /// insert the column at the specified position
     void insert(size_t position, const ColumnWithTypeAndName& elem);
@@ -97,8 +100,7 @@ public:
     ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; }
     const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
 
-    Status copy_column_data_to_block(bool is_block_mem_reuse,
-                                     doris::vectorized::IColumn* input_col_ptr,
+    Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr,
                                      uint16_t* sel_rowid_idx, uint16_t select_size, int block_cid,
                                      size_t batch_size) {
         // Only the additional deleted filter condition need to materialize column be at the end of the block
@@ -108,21 +110,22 @@ public:
         //      `select b from table;`
         // a column only effective in segment iterator, the block from query engine only contain the b column.
         // so the `block_cid >= data.size()` is true
-        if (block_cid >= data.size())
+        if (block_cid >= data.size()) {
             return Status::OK();
+        }
 
-        if (is_block_mem_reuse) {
-            auto* raw_res_ptr = this->get_by_position(block_cid).column.get();
-            const_cast<doris::vectorized::IColumn*>(raw_res_ptr)->reserve(batch_size);
-            return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr));
-        } else {
-            MutableColumnPtr res_col_ptr = data[block_cid].type->create_column();
-            res_col_ptr->reserve(batch_size);
-            auto* raw_res_ptr = res_col_ptr.get();
-            RETURN_IF_ERROR(input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, const_cast<doris::vectorized::IColumn*>(raw_res_ptr)));
-            this->replace_by_position(block_cid, std::move(res_col_ptr));
-            return Status::OK();
+        MutableColumnPtr raw_res_ptr = this->get_by_position(block_cid).column->assume_mutable();
+        raw_res_ptr->reserve(batch_size);
+
+        // adapt for outer join change column to nullable
+        if (raw_res_ptr->is_nullable()) {
+            auto col_ptr_nullable =
+                    reinterpret_cast<vectorized::ColumnNullable*>(raw_res_ptr.get());
+            col_ptr_nullable->get_null_map_column().insert_many_defaults(select_size);
+            raw_res_ptr = col_ptr_nullable->get_nested_column_ptr();
         }
+
+        return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, raw_res_ptr);
     }
 
     void replace_by_position(size_t position, ColumnPtr&& res) {
@@ -335,7 +338,7 @@ public:
     size_t rows() const;
     size_t columns() const { return _columns.size(); }
 
-    bool empty() { return rows() == 0; }
+    bool empty() const { return rows() == 0; }
 
     MutableColumns& mutable_columns() { return _columns; }
 
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 59387fbf10..f6101692ce 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -65,22 +65,16 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
     auto doris_scanner_row_num =
             _limit == -1 ? config::doris_scanner_row_num
                          : std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit);
-    auto block_size = _limit == -1 ? state->batch_size()
-                                   : std::min(static_cast<int64_t>(state->batch_size()), _limit);
-    auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / block_size;
+    _block_size = _limit == -1 ? state->batch_size()
+                               : std::min(static_cast<int64_t>(state->batch_size()), _limit);
+    auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size;
     auto pre_block_count =
             std::min(_volap_scanners.size(),
                      static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) *
             block_per_scanner;
 
     for (int i = 0; i < pre_block_count; ++i) {
-        auto block = new Block;
-        for (const auto slot_desc : _tuple_desc->slots()) {
-            auto column_ptr = slot_desc->get_empty_mutable_column();
-            column_ptr->reserve(block_size);
-            block->insert(ColumnWithTypeAndName(
-                    std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
-        }
+        auto block = new Block(_tuple_desc->slots(), _block_size);
         _free_blocks.emplace_back(block);
         _buffered_bytes += block->allocated_bytes();
     }
@@ -152,7 +146,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     Status status = Status::OK();
     bool eos = false;
     RuntimeState* state = scanner->runtime_state();
-    DCHECK(NULL != state);
+    DCHECK(nullptr != state);
     if (!scanner->is_open()) {
         status = scanner->open();
         if (!status.ok()) {
@@ -203,8 +197,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     bool get_free_block = true;
 
-    while (!eos && raw_rows_read < raw_rows_threshold &&
-           raw_bytes_read < raw_bytes_threshold && get_free_block) {
+    while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold &&
+           get_free_block) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
@@ -230,7 +224,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             std::lock_guard<std::mutex> l(_free_blocks_lock);
             _free_blocks.emplace_back(block);
         } else {
-            if (!blocks.empty() && blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) {
+            if (!blocks.empty() &&
+                blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) {
                 MutableBlock(blocks.back()).merge(*block);
                 block->clear_column_data();
                 std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -408,7 +403,9 @@ Status VOlapScanNode::close(RuntimeState* state) {
     _scan_block_added_cv.notify_all();
 
     // join transfer thread
-    if (_transfer_thread) _transfer_thread->join();
+    if (_transfer_thread) {
+        _transfer_thread->join();
+    }
 
     // clear some block in queue
     // TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread,
@@ -475,7 +472,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     }
 
     // wait for block from queue
-    Block* materialized_block = NULL;
+    Block* materialized_block = nullptr;
     {
         std::unique_lock<std::mutex> l(_blocks_lock);
         SCOPED_TIMER(_olap_wait_batch_queue_timer);
@@ -490,14 +487,14 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
 
         if (!_materialized_blocks.empty()) {
             materialized_block = _materialized_blocks.back();
-            DCHECK(materialized_block != NULL);
+            DCHECK(materialized_block != nullptr);
             _materialized_blocks.pop_back();
             _materialized_row_batches_bytes -= materialized_block->allocated_bytes();
         }
     }
 
     // return block
-    if (NULL != materialized_block) {
+    if (nullptr != materialized_block) {
         // notify scanner
         _block_consumed_cv.notify_one();
         // get scanner's block memory
@@ -533,8 +530,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     return _status;
 }
 
-// TODO: we should register the mem cost of new Block in
-// alloc block
 Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
     {
         std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -544,15 +539,19 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
             return block;
         }
     }
+
     get_free_block = false;
-    return new Block();
+
+    auto block = new Block(_tuple_desc->slots(), _block_size);
+    _buffered_bytes += block->allocated_bytes();
+    return block;
 }
 
 int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
     std::list<VOlapScanner*> olap_scanners;
     int assigned_thread_num = _running_thread;
     size_t max_thread = std::min(_volap_scanners.size(),
-                     static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
+                                 static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
     // copy to local
     {
         // How many thread can apply to this query
@@ -563,7 +562,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
                 thread_slot_num = _free_blocks.size() / block_per_scanner;
                 thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
                 thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num);
-                if (thread_slot_num <= 0) thread_slot_num = 1;
+                if (thread_slot_num <= 0) {
+                    thread_slot_num = 1;
+                }
             } else {
                 std::lock_guard<std::mutex> l(_scan_blocks_lock);
                 if (_scan_blocks.empty()) {
@@ -583,9 +584,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
                 auto scanner = _volap_scanners.front();
                 _volap_scanners.pop_front();
 
-                if (scanner->need_to_close())
+                if (scanner->need_to_close()) {
                     scanner->close(state);
-                else {
+                } else {
                     olap_scanners.push_back(scanner);
                     _running_thread++;
                     assigned_thread_num++;
diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h
index 921399ee6b..831b0963b6 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -39,6 +39,7 @@ public:
     }
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
     Status close(RuntimeState* state) override;
+
 private:
     void transfer_thread(RuntimeState* state);
     void scanner_thread(VOlapScanner* scanner);
@@ -64,6 +65,8 @@ private:
     std::mutex _volap_scanners_lock;
 
     int _max_materialized_blocks;
+
+    size_t _block_size = 0;
 };
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org