You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/27 15:08:43 UTC

[incubator-doris] branch dev-1.0.1 updated (d752946faa -> d2a8a399be)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from d752946faa [hotfix] fix load with close wait failed error
     new d0c4b5c503 [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)
     new 0dce0ff70c [bugfix]teach BufferedBlockMgr2 track memory right (#9722)
     new 37410280c8 [fix](function) If function adds type inference (#9728)
     new e34be8663d [bugfix] fix memleak in olapscannode(#9736)
     new 5bfc8e9cb1 [feature] add zstd compression codec (#9747)
     new 1cc0693a00 [security] update canal version to fix fastjson security issue (#9763)
     new 93f0d50bc6 [config] Add backend_rpc_timeout_second in FE config (#9779)
     new d2a8a399be [fix] Fix bug of bloom filter hash value calculation error (#9802)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/es_http_scan_node.cpp                  |   2 +-
 be/src/exec/olap_scan_node.cpp                     |   2 +-
 be/src/exec/olap_scanner.h                         |   2 +-
 be/src/exec/scan_node.cpp                          |   5 +-
 be/src/exec/scan_node.h                            |   1 +
 be/src/olap/bloom_filter_predicate.h               |   8 +-
 be/src/olap/schema.cpp                             |   2 +-
 be/src/runtime/buffered_block_mgr2.cc              |  11 +-
 be/src/util/block_compression.cpp                  | 149 +++++++++++++++++++++
 be/src/vec/columns/column_dictionary.h             |  24 +++-
 be/src/vec/exec/join/vhash_join_node.cpp           |  14 +-
 be/src/vec/exec/volap_scanner.cpp                  |   8 ++
 be/src/vec/exec/volap_scanner.h                    |   2 +
 be/src/vec/exprs/vexpr.h                           |   5 +-
 be/src/vec/exprs/vexpr_context.h                   |   4 +
 be/src/vec/utils/util.hpp                          |  12 +-
 be/test/util/block_compression_test.cpp            |   2 +
 docs/en/administrator-guide/config/fe_config.md    |  10 ++
 docs/en/administrator-guide/multi-tenant.md        |  12 ++
 docs/zh-CN/administrator-guide/config/fe_config.md |  10 ++
 fe/fe-core/pom.xml                                 |   4 +-
 .../apache/doris/analysis/FunctionCallExpr.java    |   8 +-
 .../java/org/apache/doris/common/ClientPool.java   |   5 +-
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |   8 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  15 ++-
 .../doris/load/routineload/RoutineLoadManager.java |  73 +++++++++-
 .../load/routineload/RoutineLoadTaskScheduler.java |   3 +-
 .../mysql/privilege/UserResourceProperty.java      |  31 -----
 .../org/apache/doris/planner/BrokerScanNode.java   |  40 ++++--
 .../load/routineload/KafkaRoutineLoadJobTest.java  |  19 ++-
 .../doris/load/routineload/RoutineLoadJobTest.java |   9 +-
 .../load/routineload/RoutineLoadManagerTest.java   |  26 ++--
 .../load/routineload/RoutineLoadSchedulerTest.java |   7 +-
 .../transaction/GlobalTransactionMgrTest.java      |  10 +-
 .../data/correctness/test_select_constant.out      |   3 +
 37 files changed, 436 insertions(+), 115 deletions(-)
 delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/08: [feature] add zstd compression codec (#9747)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 5bfc8e9cb1b4cb98168cd979f07d9c4e1650e5eb
Author: Kang <kx...@gmail.com>
AuthorDate: Fri May 27 21:56:18 2022 +0800

    [feature] add zstd compression codec (#9747)
    
    ZSTD compression is fast with high compression ratio. It can be used to archive higher compression ratio
    than default Lz4f codec for storing cost sensitive data such as logs.
    
    Compared to Lz4f codec, we see zstd codec get 35% compressed size off, 30% faster at first time read without OS page
    cache, 40% slower at second time read with OS page cache in the following comparison test.
    
    test data: 25GB text log, 110 million rows
    test table: test_table(ts varchar(30), log string)
    test SQL: set enable_vectorized_engine=1; select sum(length(log)) from test_table
    be.conf: disable_storage_page_cache = true
    set this config to disable doris page cache to avoid all data cached in memory for test real decompression speed.
    test result
    
    master branch with lz4f codec result:
    - compressed size 4.3G
    - SQL first exec time(read data from disk + decompress + little computation) : 18.3s
    - SQL second exec time(read data from OS pagecache + decompress + little computation) : 2.4s
    
    this branch with zstd codec (hardcode enable it) result:
    - compressed size: 2.8G
    - SQL first exec time: 12.8s
    - SQL second exec time: 3.4s
---
 be/src/util/block_compression.cpp       | 149 ++++++++++++++++++++++++++++++++
 be/test/util/block_compression_test.cpp |   2 +
 2 files changed, 151 insertions(+)

diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp
index a1ee74f047..51f485f348 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -22,6 +22,8 @@
 #include <snappy/snappy-sinksource.h>
 #include <snappy/snappy.h>
 #include <zlib.h>
+#include <zstd.h>
+#include <zstd_errors.h>
 
 #include <limits>
 
@@ -375,6 +377,150 @@ public:
     }
 };
 
+// for ZSTD compression and decompression, with BOTH fast and high compression ratio
+class ZstdBlockCompression : public BlockCompressionCodec {
+public:
+    // reenterable initialization for compress/decompress context
+    inline Status init() override {
+        if (!ctx_c) {
+            ctx_c = ZSTD_createCCtx();
+            if (!ctx_c) {
+                return Status::InvalidArgument("Fail to ZSTD_createCCtx");
+            }
+        }
+
+        if (!ctx_d) {
+            ctx_d = ZSTD_createDCtx();
+            if (!ctx_d) {
+                return Status::InvalidArgument("Fail to ZSTD_createDCtx");
+            }
+        }
+
+        return Status::OK();
+    }
+
+    ~ZstdBlockCompression() override {
+        if (ctx_c) ZSTD_freeCCtx(ctx_c);
+        if (ctx_d) ZSTD_freeDCtx(ctx_d);
+    }
+
+    size_t max_compressed_len(size_t len) const override {
+        if (len > std::numeric_limits<int32_t>::max()) {
+            return 0;
+        }
+        return ZSTD_compressBound(len);
+    }
+
+    Status compress(const Slice& input, Slice* output) const override {
+        std::vector<Slice> inputs {input};
+        return compress(inputs, output);
+    }
+
+    // follow ZSTD official example
+    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
+    Status compress(const std::vector<Slice>& inputs, Slice* output) const {
+        if (!ctx_c) return Status::InvalidArgument("compression context NOT initialized");
+
+        // reset ctx to start new compress session
+        auto ret = ZSTD_CCtx_reset(ctx_c, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_CCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set compression level to default 3
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter compression level error: $0",
+                                        ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+        // set checksum flag to 1
+        ret = ZSTD_CCtx_setParameter(ctx_c, ZSTD_c_checksumFlag, 1);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(
+                    strings::Substitute("ZSTD_CCtx_setParameter checksumFlag error: $0",
+                                        ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        for (size_t i = 0; i < inputs.size(); i++) {
+            ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
+
+            bool last_input = (i == inputs.size() - 1);
+            auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
+
+            bool finished = false;
+            do {
+                // do compress
+                auto ret = ZSTD_compressStream2(ctx_c, &out_buf, &in_buf, mode);
+
+                if (ZSTD_isError(ret)) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 error: $0",
+                                                ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+                }
+
+                // ret is ZSTD hint for needed output buffer size
+                if (ret > 0 && out_buf.pos == out_buf.size) {
+                    return Status::InvalidArgument(
+                            strings::Substitute("ZSTD_compressStream2 output buffer full"));
+                }
+
+                finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
+            } while (!finished);
+        }
+
+        // set compressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+    // follow ZSTD official example
+    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c
+    Status decompress(const Slice& input, Slice* output) const {
+        if (!ctx_d) return Status::InvalidArgument("decompression context NOT initialized");
+
+        // reset ctx to start a new decompress session
+        auto ret = ZSTD_DCtx_reset(ctx_d, ZSTD_reset_session_only);
+        if (ZSTD_isError(ret)) {
+            return Status::InvalidArgument(strings::Substitute(
+                    "ZSTD_DCtx_reset error: $0", ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+        }
+
+        ZSTD_inBuffer in_buf = {input.data, input.size, 0};
+        ZSTD_outBuffer out_buf = {output->data, output->size, 0};
+
+        while (in_buf.pos < in_buf.size) {
+            // do decompress
+            auto ret = ZSTD_decompressStream(ctx_d, &out_buf, &in_buf);
+
+            if (ZSTD_isError(ret)) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream error: $0",
+                                            ZSTD_getErrorString(ZSTD_getErrorCode(ret))));
+            }
+
+            // ret is ZSTD hint for needed output buffer size
+            if (ret > 0 && out_buf.pos == out_buf.size) {
+                return Status::InvalidArgument(
+                        strings::Substitute("ZSTD_decompressStream output buffer full"));
+            }
+        }
+
+        // set decompressed size for caller
+        output->size = out_buf.pos;
+
+        return Status::OK();
+    }
+
+private:
+    // will be reused by compress/decompress
+    ZSTD_CCtx* ctx_c = nullptr;
+    ZSTD_DCtx* ctx_d = nullptr;
+};
+
 Status get_block_compression_codec(segment_v2::CompressionTypePB type,
                                    std::unique_ptr<BlockCompressionCodec>& codec) {
     BlockCompressionCodec* ptr = nullptr;
@@ -394,6 +540,9 @@ Status get_block_compression_codec(segment_v2::CompressionTypePB type,
     case segment_v2::CompressionTypePB::ZLIB:
         ptr = new ZlibBlockCompression();
         break;
+    case segment_v2::CompressionTypePB::ZSTD:
+        ptr = new ZstdBlockCompression();
+        break;
     default:
         return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
     }
diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp
index a339d54409..7b2da38703 100644
--- a/be/test/util/block_compression_test.cpp
+++ b/be/test/util/block_compression_test.cpp
@@ -101,6 +101,7 @@ TEST_F(BlockCompressionTest, single) {
     test_single_slice(segment_v2::CompressionTypePB::ZLIB);
     test_single_slice(segment_v2::CompressionTypePB::LZ4);
     test_single_slice(segment_v2::CompressionTypePB::LZ4F);
+    test_single_slice(segment_v2::CompressionTypePB::ZSTD);
 }
 
 void test_multi_slices(segment_v2::CompressionTypePB type) {
@@ -156,6 +157,7 @@ TEST_F(BlockCompressionTest, multi) {
     test_multi_slices(segment_v2::CompressionTypePB::ZLIB);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4);
     test_multi_slices(segment_v2::CompressionTypePB::LZ4F);
+    test_multi_slices(segment_v2::CompressionTypePB::ZSTD);
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/08: [bugfix]teach BufferedBlockMgr2 track memory right (#9722)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0dce0ff70c3f4350a0419ac1c609e4a9db0e3242
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Tue May 24 10:18:51 2022 +0800

    [bugfix]teach BufferedBlockMgr2 track memory right (#9722)
    
    The problem was introduced by e2d3d0134eee5d50b6619fd9194a2e5f9cb557dc.
---
 be/src/runtime/buffered_block_mgr2.cc | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc
index 92edcdcabe..64db5222e1 100644
--- a/be/src/runtime/buffered_block_mgr2.cc
+++ b/be/src/runtime/buffered_block_mgr2.cc
@@ -324,9 +324,7 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
     }
     int buffers_needed = BitUtil::ceil(size, max_block_size());
     unique_lock<mutex> lock(_lock);
-    Status st = _mem_tracker->TryConsume(size);
-    WARN_IF_ERROR(st, "consume failed");
-    if (size < max_block_size() && st) {
+    if (size < max_block_size() && _mem_tracker->TryConsume(size)) {
         // For small allocations (less than a block size), just let the allocation through.
         client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
         // client->_tracker->Consume(size);
@@ -336,7 +334,7 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
     if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) {
         return false;
     }
-    st = _mem_tracker->TryConsume(size);
+    Status st = _mem_tracker->TryConsume(size);
     WARN_IF_ERROR(st, "consume failed");
     if (st) {
         // There was still unallocated memory, don't need to recycle allocated blocks.
@@ -1094,10 +1092,9 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) {
 Status BufferedBlockMgr2::find_buffer(unique_lock<mutex>& lock, BufferDescriptor** buffer_desc) {
     *buffer_desc = nullptr;
 
-    Status st = _mem_tracker->TryConsume(_max_block_size);
-    WARN_IF_ERROR(st, "try to allocate a new buffer failed");
     // First, try to allocate a new buffer.
-    if (_free_io_buffers.size() < _block_write_threshold && st) {
+    if (_free_io_buffers.size() < _block_write_threshold &&
+        _mem_tracker->TryConsume(_max_block_size)) {
         uint8_t* new_buffer = new uint8_t[_max_block_size];
         *buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size));
         (*buffer_desc)->all_buffers_it =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 07/08: [config] Add backend_rpc_timeout_second in FE config (#9779)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 93f0d50bc66c0a960a04b0de0ec75f6c50c17f6f
Author: pengxiangyu <di...@163.com>
AuthorDate: Fri May 27 21:58:09 2022 +0800

    [config] Add backend_rpc_timeout_second in FE config (#9779)
---
 docs/en/administrator-guide/config/fe_config.md                | 10 ++++++++++
 docs/zh-CN/administrator-guide/config/fe_config.md             | 10 ++++++++++
 .../src/main/java/org/apache/doris/common/ClientPool.java      |  5 ++---
 fe/fe-core/src/main/java/org/apache/doris/common/Config.java   |  3 +++
 4 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md
index bfef450d17..6df6807f89 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -2181,3 +2181,13 @@ Default: 10
 Is it possible to dynamically configure: false
 
 Is it a configuration item unique to the Master FE node: false
+
+### backend_rpc_timeout_ms
+
+ Timeout millisecond for Fe sending rpc request to BE
+
+Default: 60000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: true
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md
index 0528430a60..0f1f6436d6 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -2204,3 +2204,13 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
 是否可以动态配置:false
 
 是否为 Master FE 节点独有的配置项:false
+
+### backend_rpc_timeout_ms
+
+ FE向BE的BackendService发送rpc请求时的超时时间,单位:毫秒。
+
+默认值:60000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:true
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 4098e06073..1b77d3ed43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -30,7 +30,6 @@ public class ClientPool {
     static int heartbeatTimeoutMs = FeConstants.heartbeat_interval_second * 1000;
 
     static GenericKeyedObjectPoolConfig backendConfig = new GenericKeyedObjectPoolConfig();
-    static int backendTimeoutMs = 60000; // 1min
 
     static {
         heartbeatConfig.setLifo(true);            // set Last In First Out strategy
@@ -68,10 +67,10 @@ public class ClientPool {
             new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs,
                     Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<FrontendService.Client> frontendPool =
-            new GenericPool("FrontendService", backendConfig, backendTimeoutMs,
+            new GenericPool("FrontendService", backendConfig, Config.backend_rpc_timeout_ms,
                     Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<BackendService.Client> backendPool =
-            new GenericPool("BackendService", backendConfig, backendTimeoutMs);
+            new GenericPool("BackendService", backendConfig, Config.backend_rpc_timeout_ms);
     public static GenericPool<TPaloBrokerService.Client> brokerPool =
             new GenericPool("TPaloBrokerService", brokerPoolConfig, brokerTimeoutMs);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 6660d5e63c..45cfac5e2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1623,4 +1623,7 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true)
     public static boolean skip_compaction_slower_replica = true;
+
+    @ConfField(mutable = false, masterOnly = true)
+    public static int backend_rpc_timeout_ms = 60000; // 1 min
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/08: [bugfix] fix memleak in olapscannode(#9736)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit e34be8663dcd13f188d92a795955f70a63174d56
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Thu May 26 15:06:54 2022 +0800

    [bugfix] fix memleak in olapscannode(#9736)
---
 be/src/exec/es_http_scan_node.cpp        |  2 +-
 be/src/exec/olap_scan_node.cpp           |  2 +-
 be/src/exec/olap_scanner.h               |  2 +-
 be/src/exec/scan_node.cpp                |  5 +++--
 be/src/exec/scan_node.h                  |  1 +
 be/src/vec/exec/join/vhash_join_node.cpp | 14 +++++++-------
 be/src/vec/exec/volap_scanner.cpp        |  8 ++++++++
 be/src/vec/exec/volap_scanner.h          |  2 ++
 be/src/vec/exprs/vexpr.h                 |  5 ++++-
 be/src/vec/exprs/vexpr_context.h         |  4 ++++
 be/src/vec/utils/util.hpp                | 12 +++++++-----
 11 files changed, 39 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index 64b9792668..7e88dba043 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -161,7 +161,7 @@ Status EsHttpScanNode::open(RuntimeState* state) {
     auto checker = [&](int index) {
         return _conjunct_to_predicate[index] != -1 && list[_conjunct_to_predicate[index]];
     };
-    std::string vconjunct_information = _peel_pushed_vconjunct(checker);
+    std::string vconjunct_information = _peel_pushed_vconjunct(state, checker);
     _scanner_profile->add_info_string("VconjunctExprTree", vconjunct_information);
 
     RETURN_IF_ERROR(start_scanners());
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index d021cc3930..0d40d40b3a 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -542,7 +542,7 @@ void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
 
     // filter idle conjunct in vexpr_contexts
     auto checker = [&](int index) { return _pushed_conjuncts_index.count(index); };
-    std::string vconjunct_information = _peel_pushed_vconjunct(checker);
+    std::string vconjunct_information = _peel_pushed_vconjunct(state, checker);
     _scanner_profile->add_info_string("VconjunctExprTree", vconjunct_information);
 }
 
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 8c173c20a6..4a6ecbf06f 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -60,7 +60,7 @@ public:
 
     virtual Status get_batch(RuntimeState* state, RowBatch* batch, bool* eof);
 
-    Status close(RuntimeState* state);
+    virtual Status close(RuntimeState* state);
 
     RuntimeState* runtime_state() { return _runtime_state; }
 
diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp
index 9f3e9cadb7..8fb838f0e3 100644
--- a/be/src/exec/scan_node.cpp
+++ b/be/src/exec/scan_node.cpp
@@ -46,7 +46,8 @@ Status ScanNode::prepare(RuntimeState* state) {
 // It relies on the logic of function convertConjunctsToAndCompoundPredicate() of FE splicing expr.
 // It requires FE to satisfy each splicing with 'and' expr, and spliced from left to right, in order.
 // Expr tree specific forms do not require requirements.
-std::string ScanNode::_peel_pushed_vconjunct(const std::function<bool(int)>& checker) {
+std::string ScanNode::_peel_pushed_vconjunct(RuntimeState* state,
+                                             const std::function<bool(int)>& checker) {
     if (_vconjunct_ctx_ptr.get() == nullptr) {
         return "null";
     }
@@ -56,7 +57,7 @@ std::string ScanNode::_peel_pushed_vconjunct(const std::function<bool(int)>& che
 
     if (conjunct_expr_root != nullptr) {
         vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct(
-                conjunct_expr_root, leaf_index, checker);
+                state, *_vconjunct_ctx_ptr.get(), conjunct_expr_root, leaf_index, checker);
         if (new_conjunct_expr_root == nullptr) {
             _vconjunct_ctx_ptr = nullptr;
         } else {
diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h
index 9808b874e6..fbe127c129 100644
--- a/be/src/exec/scan_node.h
+++ b/be/src/exec/scan_node.h
@@ -91,6 +91,7 @@ public:
 
 protected:
     std::string _peel_pushed_vconjunct(
+            RuntimeState* state,
             const std::function<bool(int)>& checker); // remove pushed expr from conjunct tree
 
     RuntimeProfile::Counter* _bytes_read_counter; // # bytes read from the scanner
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 8a8691a51d..a350f99ef4 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -133,21 +133,21 @@ struct ProcessRuntimeFilterBuild {
         if (_join_node->_runtime_filter_descs.empty()) {
             return Status::OK();
         }
-        VRuntimeFilterSlots* runtime_filter_slots =
-                new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs,
-                                        _join_node->_runtime_filter_descs);
+        VRuntimeFilterSlots runtime_filter_slots(_join_node->_probe_expr_ctxs,
+                                                 _join_node->_build_expr_ctxs,
+                                                 _join_node->_runtime_filter_descs);
 
-        RETURN_IF_ERROR(runtime_filter_slots->init(state, hash_table_ctx.hash_table.get_size()));
+        RETURN_IF_ERROR(runtime_filter_slots.init(state, hash_table_ctx.hash_table.get_size()));
 
-        if (!runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) {
+        if (!runtime_filter_slots.empty() && !_join_node->_inserted_rows.empty()) {
             {
                 SCOPED_TIMER(_join_node->_push_compute_timer);
-                runtime_filter_slots->insert(_join_node->_inserted_rows);
+                runtime_filter_slots.insert(_join_node->_inserted_rows);
             }
         }
         {
             SCOPED_TIMER(_join_node->_push_down_timer);
-            runtime_filter_slots->publish();
+            runtime_filter_slots.publish();
         }
 
         return Status::OK();
diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp
index e857975e5f..4c86830b30 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -78,4 +78,12 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo
 void VOlapScanner::set_tablet_reader() {
     _tablet_reader = std::make_unique<BlockReader>();
 }
+
+Status VOlapScanner::close(RuntimeState* state) {
+    if (_is_closed) {
+        return Status::OK();
+    }
+    if (_vconjunct_ctx) _vconjunct_ctx->close(state);
+    return OlapScanner::close(state);
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 54171104d7..7a9cd1f14b 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -37,6 +37,8 @@ public:
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eof);
 
+    Status close(RuntimeState* state) override;
+
     Status get_batch(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
         return Status::NotSupported("Not Implemented VOlapScanNode Node::get_next scalar");
     }
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 1a99aee464..0bbce41af8 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -106,7 +106,10 @@ public:
                                           VExpr* parent, int* node_idx, VExpr** root_expr,
                                           VExprContext** ctx);
     const std::vector<VExpr*>& children() const { return _children; }
-    void set_children(std::vector<VExpr*> children) { _children = children; }
+    void set_children(RuntimeState* state, VExprContext* ctx, std::vector<VExpr*> children) {
+        close(state, ctx, ctx->get_function_state_scope());
+        _children = children;
+    }
     virtual std::string debug_string() const;
     static std::string debug_string(const std::vector<VExpr*>& exprs);
     static std::string debug_string(const std::vector<VExprContext*>& ctxs);
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 0021779b35..9d2d467dd4 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -65,6 +65,10 @@ public:
         return _last_result_column_id;
     }
 
+    FunctionContext::FunctionStateScope get_function_state_scope() const {
+        return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL;
+    }
+
 private:
     friend class VExpr;
 
diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp
index 6cf9a14471..8e50d12b5d 100644
--- a/be/src/vec/utils/util.hpp
+++ b/be/src/vec/utils/util.hpp
@@ -64,18 +64,20 @@ public:
         return data_types;
     }
 
-    static VExpr* dfs_peel_conjunct(VExpr* expr, int& leaf_index,
-                                    std::function<bool(int)> checker) {
+    static VExpr* dfs_peel_conjunct(RuntimeState* state, VExprContext* context, VExpr* expr,
+                                    int& leaf_index, std::function<bool(int)> checker) {
         static constexpr auto is_leaf = [](VExpr* expr) { return !expr->is_and_expr(); };
 
         if (is_leaf(expr)) {
             return checker(leaf_index++) ? nullptr : expr;
         } else {
-            VExpr* left_child = dfs_peel_conjunct(expr->children()[0], leaf_index, checker);
-            VExpr* right_child = dfs_peel_conjunct(expr->children()[1], leaf_index, checker);
+            VExpr* left_child =
+                    dfs_peel_conjunct(state, context, expr->children()[0], leaf_index, checker);
+            VExpr* right_child =
+                    dfs_peel_conjunct(state, context, expr->children()[1], leaf_index, checker);
 
             if (left_child != nullptr && right_child != nullptr) {
-                expr->set_children({left_child, right_child});
+                expr->set_children(state, context, {left_child, right_child});
                 return expr;
             }
             // here do not close Expr* now


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 08/08: [fix] Fix bug of bloom filter hash value calculation error (#9802)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d2a8a399be2024e0429fff677f38a11198f215e3
Author: Luwei <81...@qq.com>
AuthorDate: Fri May 27 20:44:26 2022 +0800

    [fix] Fix bug of bloom filter hash value calculation error (#9802)
    
    * Fix bug of bloom filter hash value calculation error
    
    * fix code style
---
 be/src/olap/bloom_filter_predicate.h   |  8 +++++---
 be/src/olap/schema.cpp                 |  2 +-
 be/src/vec/columns/column_dictionary.h | 24 ++++++++++++++++++++----
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index 01851cfd96..c8fbdab94c 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -119,8 +119,10 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
         auto& null_map_data = nullable_col->get_null_map_column().get_data();
         // deal ColumnDict
         if (nullable_col->get_nested_column().is_column_dictionary()) {
-            auto* dict_col = vectorized::check_and_get_column<vectorized::ColumnDictI32>(nullable_col->get_nested_column());
-            const_cast<vectorized::ColumnDictI32*>(dict_col)->generate_hash_values();
+            auto* dict_col = vectorized::check_and_get_column<vectorized::ColumnDictI32>(
+                    nullable_col->get_nested_column());
+            const_cast<vectorized::ColumnDictI32*>(dict_col)
+                    ->generate_hash_values_for_runtime_filter();
             for (uint16_t i = 0; i < *size; i++) {
                 uint16_t idx = sel[i];
                 sel[new_size] = idx;
@@ -139,7 +141,7 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
         }
     } else if (column.is_column_dictionary()) {
         auto* dict_col = vectorized::check_and_get_column<vectorized::ColumnDictI32>(column);
-        const_cast<vectorized::ColumnDictI32*>(dict_col)->generate_hash_values();
+        const_cast<vectorized::ColumnDictI32*>(dict_col)->generate_hash_values_for_runtime_filter();
         for (uint16_t i = 0; i < *size; i++) {
             uint16_t idx = sel[i];
             sel[new_size] = idx;
diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp
index 6b8354f26a..e30389b7d3 100644
--- a/be/src/olap/schema.cpp
+++ b/be/src/olap/schema.cpp
@@ -205,7 +205,7 @@ vectorized::IColumn::MutablePtr Schema::get_predicate_column_ptr(FieldType type)
     case OLAP_FIELD_TYPE_VARCHAR:
     case OLAP_FIELD_TYPE_STRING:
         if (config::enable_low_cardinality_optimize) {
-            return doris::vectorized::ColumnDictionary<doris::vectorized::Int32>::create();
+            return doris::vectorized::ColumnDictionary<doris::vectorized::Int32>::create(type);
         }
         return doris::vectorized::PredicateColumnType<StringValue>::create();
 
diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h
index cc27ca1cdb..97052cfb7c 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -61,6 +61,7 @@ private:
     ColumnDictionary() {}
     ColumnDictionary(const size_t n) : _codes(n) {}
     ColumnDictionary(const ColumnDictionary& src) : _codes(src._codes.begin(), src._codes.end()) {}
+    ColumnDictionary(FieldType type) : _type(type) {}
 
 public:
     using Self = ColumnDictionary;
@@ -250,8 +251,8 @@ public:
         return _dict.find_code_by_bound(value, greater, eq);
     }
 
-    void generate_hash_values() {
-        _dict.generate_hash_values();
+    void generate_hash_values_for_runtime_filter() {
+        _dict.generate_hash_values_for_runtime_filter(_type);
     }
 
     uint32_t get_hash_value(uint32_t idx) const {
@@ -308,12 +309,26 @@ public:
             return code >= _dict_data.size() ? _null_value : _dict_data[code];
         }
 
-        inline void generate_hash_values() {
+        // The function is only used in the runtime filter feature
+        inline void generate_hash_values_for_runtime_filter(FieldType type) {
             if (_hash_values.empty()) {
                 _hash_values.resize(_dict_data.size());
                 for (size_t i = 0; i < _dict_data.size(); i++) {
                     auto& sv = _dict_data[i];
-                    uint32_t hash_val = HashUtil::murmur_hash3_32(sv.ptr, sv.len, 0);
+                    // The char data is stored in the disk with the schema length,
+                    // and zeros are filled if the length is insufficient
+
+                    // When reading data, use shrink_char_type_column_suffix_zero(_char_type_idx)
+                    // Remove the suffix 0
+                    // When writing data, use the CharField::consume function to fill in the trailing 0.
+
+                    // For dictionary data of char type, sv.len is the schema length,
+                    // so use strnlen to remove the 0 at the end to get the actual length.
+                    int32_t len = sv.len;
+                    if (type == OLAP_FIELD_TYPE_CHAR) {
+                        len = strnlen(sv.ptr, sv.len);
+                    }
+                    uint32_t hash_val = HashUtil::murmur_hash3_32(sv.ptr, len, 0);
                     _hash_values[i] = hash_val;
                 }
             }
@@ -411,6 +426,7 @@ private:
     bool _dict_code_converted = false;
     Dictionary _dict;
     Container _codes;
+    FieldType _type;
 };
 
 template class ColumnDictionary<int32_t>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 06/08: [security] update canal version to fix fastjson security issue (#9763)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1cc0693a00e0f843ac799f2b8a8df829d369f6f6
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed May 25 18:22:37 2022 +0800

    [security] update canal version to fix fastjson security issue (#9763)
---
 fe/fe-core/pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 2f271b6ad9..dd553ca754 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -416,7 +416,7 @@ under the License.
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.client</artifactId>
-            <version>1.1.4</version>
+            <version>1.1.6</version>
             <exclusions>
                 <exclusion>
                     <groupId>ch.qos.logback</groupId>
@@ -432,7 +432,7 @@ under the License.
         <dependency>
             <groupId>com.alibaba.otter</groupId>
             <artifactId>canal.protocol</artifactId>
-            <version>1.1.4</version>
+            <version>1.1.6</version>
             <exclusions>
                 <exclusion>
                     <groupId>ch.qos.logback</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/08: [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d0c4b5c503ffea1b41566e01126ad6522ab11e98
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu May 26 08:42:09 2022 +0800

    [fix](resource-tag) Consider resource tags when assigning tasks for broker & routine load (#9492)
    
    This CL mainly changes:
    1. Broker Load
        When assigning backends, use user level resource tag to find available backends.
        If user level resource tag is not set, broker load task can be assigned to any BE node,
        otherwise, task can only be assigned to BE node which match the user level tags.
    
    2. Routine Load
        The current routine load job does not have user info, so it can not get user level tag when assigning tasks.
        So there are 2 ways:
        1. For old routine load job, use tags of replica allocation info to select BE nodes.
        2. For new routine load job, the user info will be added and persisted in routine load job.
---
 docs/en/administrator-guide/multi-tenant.md        | 12 ++++
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      |  8 ++-
 .../doris/load/routineload/RoutineLoadJob.java     | 15 ++++-
 .../doris/load/routineload/RoutineLoadManager.java | 73 +++++++++++++++++++---
 .../load/routineload/RoutineLoadTaskScheduler.java |  3 +-
 .../mysql/privilege/UserResourceProperty.java      | 31 ---------
 .../org/apache/doris/planner/BrokerScanNode.java   | 40 ++++++++----
 .../load/routineload/KafkaRoutineLoadJobTest.java  | 19 ++++--
 .../doris/load/routineload/RoutineLoadJobTest.java |  9 ++-
 .../load/routineload/RoutineLoadManagerTest.java   | 26 +++++---
 .../load/routineload/RoutineLoadSchedulerTest.java |  7 ++-
 .../transaction/GlobalTransactionMgrTest.java      | 10 ++-
 13 files changed, 179 insertions(+), 76 deletions(-)

diff --git a/docs/en/administrator-guide/multi-tenant.md b/docs/en/administrator-guide/multi-tenant.md
index 8c37afad23..dd587feedd 100644
--- a/docs/en/administrator-guide/multi-tenant.md
+++ b/docs/en/administrator-guide/multi-tenant.md
@@ -135,6 +135,18 @@ Node resource division refers to setting tags for BE nodes in a Doris cluster, a
     In this way, we have achieved physical resource isolation for different user queries by dividing nodes and restricting user resource usage. Furthermore, we can create different users for different business departments and restrict each user from using different resource groups. In order to avoid the use of resource interference between different business parts. For example, there is a business table in the cluster that needs to be shared by all 9 business departments, but it is hoped [...]
     
     On the other hand, for the isolation of online and offline tasks. We can use resource groups to achieve this. For example, we can divide nodes into two resource groups, Online and Offline. The table data is still stored in 3 copies, of which 2 copies are stored in the Online resource group, and 1 copy is stored in the Offline resource group. The Online resource group is mainly used for online data services with high concurrency and low latency. Some large queries or offline ETL opera [...]
+
+4. Resource group assignments for load job
+
+    The resource usage of load jobs (including insert, broker load, routine load, stream load, etc.) can be divided into two parts:
+    1. Computing resources: responsible for reading data sources, data transformation and distribution.
+    2. Write resource: responsible for data encoding, compression and writing to disk.
+
+    The write resource must be the node where the replica is located, and the computing resource can theoretically select any node to complete. Therefore, the allocation of resource groups for load jobs is divided into two steps:
+    1. Use user-level resource tags to limit the resource groups that computing resources can use.
+    2. Use the resource tag of the replica to limit the resource group that the write resource can use.
+
+    So if you want all the resources used by the load operation to be limited to the resource group where the data is located, you only need to set the resource tag of the user level to the same as the resource tag of the replica.
     
 ## Single query resource limit
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 903e65ff6d..bf33ab2f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -123,7 +123,7 @@ public class LoadingTaskPlanner {
         // 1. Broker scan node
         BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), destTupleDesc, "BrokerScanNode",
                 fileStatusesList, filesAdded);
-        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
+        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism, userInfo);
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNodes.add(scanNode);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index f85898f10b..e740d92965 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
 import org.apache.doris.analysis.AlterRoutineLoadStmt;
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -103,8 +104,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     }
 
     public KafkaRoutineLoadJob(Long id, String name, String clusterName,
-                               long dbId, long tableId, String brokerList, String topic) {
-        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
+                               long dbId, long tableId, String brokerList, String topic,
+                               UserIdentity userIdentity) {
+        super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA, userIdentity);
         this.brokerList = brokerList;
         this.topic = topic;
         this.progress = new KafkaProgress();
@@ -394,7 +396,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         long id = Catalog.getCurrentCatalog().getNextId();
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
                 db.getClusterName(), db.getId(), tableId,
-                stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
+                stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo());
         kafkaRoutineLoadJob.setOptional(stmt);
         kafkaRoutineLoadJob.checkCustomProperties();
         kafkaRoutineLoadJob.checkCustomPartition();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index e308371459..7c0b24d69c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -27,6 +27,7 @@ import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -229,6 +230,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
     // because we can not serialize the Expressions contained in job.
     protected OriginStatement origStmt;
+    // User who submit this job. Maybe null for the old version job(before v1.1)
+    protected UserIdentity userIdentity;
 
     protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // default is all data is load no delete
@@ -250,13 +253,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
     }
 
     public RoutineLoadJob(Long id, String name, String clusterName,
-            long dbId, long tableId, LoadDataSourceType dataSourceType) {
+                          long dbId, long tableId, LoadDataSourceType dataSourceType,
+                          UserIdentity userIdentity) {
         this(id, dataSourceType);
         this.name = name;
         this.clusterName = clusterName;
         this.dbId = dbId;
         this.tableId = tableId;
         this.authCode = 0;
+        this.userIdentity = userIdentity;
 
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
@@ -436,6 +441,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         return partitions;
     }
 
+    public UserIdentity getUserIdentity() {
+        return userIdentity;
+    }
+
     @Override
     public LoadTask.MergeType getMergeType() {
         return mergeType;
@@ -1611,6 +1620,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         } catch (Exception e) {
             throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e);
         }
+
+        // ATTN(cmy): this is a temp code, not like code in trunk.
+        // we don't persist userIdentity because we can not increase fe meta version for dev-1.0.1
+        userIdentity = null;
     }
 
     abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 2d6c13adab..ed220f88b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -24,6 +24,11 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -38,10 +43,13 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -411,11 +419,8 @@ public class RoutineLoadManager implements Writable {
     // check if the specified BE is available for running task
     // return true if it is available. return false if otherwise.
     // throw exception if unrecoverable errors happen.
-    public long getAvailableBeForTask(long previousBeId, String clusterName) throws LoadException {
-        List<Long> beIdsInCluster = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
-        if (beIdsInCluster == null) {
-            throw new LoadException("The " + clusterName + " has been deleted");
-        }
+    public long getAvailableBeForTask(long jobId, long previousBeId, String clusterName) throws LoadException {
+        List<Long> availableBeIds = getAvailableBackendIds(jobId, clusterName);
 
         // check if be has idle slot
         readLock();
@@ -423,7 +428,7 @@ public class RoutineLoadManager implements Writable {
             Map<Long, Integer> beIdToConcurrentTasks = getBeCurrentTasksNumMap();
 
             // 1. Find if the given BE id has available slots
-            if (previousBeId != -1L && beIdsInCluster.contains(previousBeId)) {
+            if (previousBeId != -1L && availableBeIds.contains(previousBeId)) {
                 // get the previousBackend info
                 Backend previousBackend = Catalog.getCurrentSystemInfo().getBackend(previousBeId);
                 // check previousBackend is not null && load available
@@ -447,7 +452,7 @@ public class RoutineLoadManager implements Writable {
             int idleTaskNum = 0;
             long resultBeId = -1L;
             int maxIdleSlotNum = 0;
-            for (Long beId : beIdsInCluster) {
+            for (Long beId : availableBeIds) {
                 if (beIdToMaxConcurrentTasks.containsKey(beId)) {
                     if (beIdToConcurrentTasks.containsKey(beId)) {
                         idleTaskNum = beIdToMaxConcurrentTasks.get(beId) - beIdToConcurrentTasks.get(beId);
@@ -468,6 +473,60 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    /**
+     * The routine load task can only be scheduled on backends which has proper resource tags.
+     * The tags should be got from user property.
+     * But in the old version, the routine load job does not have user info, so for compatibility,
+     * if there is no user info, we will get tags from replica allocation of the first partition of the table.
+     *
+     * @param jobId
+     * @param cluster
+     * @return
+     * @throws LoadException
+     */
+    private List<Long> getAvailableBackendIds(long jobId, String cluster) throws LoadException {
+        RoutineLoadJob job = getJob(jobId);
+        if (job == null) {
+            throw new LoadException("job " + jobId + " does not exist");
+        }
+        Set<Tag> tags;
+        if (job.getUserIdentity() == null) {
+            // For old job, there may be no user info. So we have to use tags from replica allocation
+            tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+        } else {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                // user may be dropped. Here we fall back to use replica tag
+                tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId());
+            }
+        }
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().setCluster(cluster)
+                .addTags(tags).build();
+        return Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 20000);
+    }
+
+    private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException {
+        try {
+            Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbId);
+            OlapTable tbl = db.getTableOrMetaException(tblId, Table.TableType.OLAP);
+            tbl.readLock();
+            try {
+                PartitionInfo partitionInfo = tbl.getPartitionInfo();
+                for (Partition partition : tbl.getPartitions()) {
+                    ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
+                    // just use the first one
+                    return replicaAlloc.getAllocMap().keySet();
+                }
+                // Should not run into here. Just make compiler happy.
+                return Sets.newHashSet();
+            } finally {
+                tbl.readUnlock();
+            }
+        } catch (MetaNotFoundException e) {
+            throw new LoadException(e.getMessage());
+        }
+    }
+
     public RoutineLoadJob getJob(long jobId) {
         return idToRoutineLoadJob.get(jobId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index f2f03a666a..fd07c2617e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -287,7 +287,8 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
     // return true if allocate successfully. return false if failed.
     // throw exception if unrecoverable errors happen.
     private boolean allocateTaskToBe(RoutineLoadTaskInfo routineLoadTaskInfo) throws LoadException {
-        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
+        long beId = routineLoadManager.getAvailableBeForTask(routineLoadTaskInfo.getJobId(),
+                routineLoadTaskInfo.getPreviousBeId(), routineLoadTaskInfo.getClusterName());
         if (beId == -1L) {
             return false;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
deleted file mode 100644
index e4b609de95..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserResourceProperty.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.mysql.privilege;
-
-import org.apache.doris.common.io.Writable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class UserResourceProperty implements Writable {
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index a6f26c4920..3a99bc0203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
@@ -40,7 +41,11 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.mysql.privilege.UserProperty;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TBrokerRangeDesc;
@@ -61,6 +66,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -69,6 +75,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -114,6 +121,7 @@ public class BrokerScanNode extends LoadScanNode {
     protected List<BrokerFileGroup> fileGroups;
     private boolean strictMode = false;
     private int loadParallelism = 1;
+    private UserIdentity userIdentity;
 
     protected List<List<TBrokerFileStatus>> fileStatusesList;
     // file num
@@ -136,11 +144,15 @@ public class BrokerScanNode extends LoadScanNode {
 
     private List<ParamCreateContext> paramCreateContexts;
 
+    // For broker load and external broker table
     public BrokerScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName,
                           List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) {
         super(id, destTupleDesc, planNodeName);
         this.fileStatusesList = fileStatusesList;
         this.filesAdded = filesAdded;
+        if (ConnectContext.get() != null) {
+            this.userIdentity = ConnectContext.get().getUserIdentity();
+        }
     }
 
     @Override
@@ -181,22 +193,14 @@ public class BrokerScanNode extends LoadScanNode {
         return desc.getTable() == null;
     }
 
-    @Deprecated
-    public void setLoadInfo(Table targetTable,
-                            BrokerDesc brokerDesc,
-                            List<BrokerFileGroup> fileGroups) {
-        this.targetTable = targetTable;
-        this.brokerDesc = brokerDesc;
-        this.fileGroups = fileGroups;
-    }
-
     public void setLoadInfo(long loadJobId,
                             long txnId,
                             Table targetTable,
                             BrokerDesc brokerDesc,
                             List<BrokerFileGroup> fileGroups,
                             boolean strictMode,
-                            int loadParallelism) {
+                            int loadParallelism,
+                            UserIdentity userIdentity) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.targetTable = targetTable;
@@ -204,6 +208,7 @@ public class BrokerScanNode extends LoadScanNode {
         this.fileGroups = fileGroups;
         this.strictMode = strictMode;
         this.loadParallelism = loadParallelism;
+        this.userIdentity = userIdentity;
     }
 
     // Called from init, construct source tuple information
@@ -389,10 +394,21 @@ public class BrokerScanNode extends LoadScanNode {
     }
 
     private void assignBackends() throws UserException {
+        Set<Tag> tags = Sets.newHashSet();
+        if (userIdentity != null) {
+            tags = Catalog.getCurrentCatalog().getAuth().getResourceTags(userIdentity.getQualifiedUser());
+            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                throw new UserException("No valid resource tag for user: " + userIdentity.getQualifiedUser());
+            }
+        } else {
+            LOG.debug("user info in BrokerScanNode should not be null, add log to observer");
+        }
         backends = Lists.newArrayList();
+        // broker scan node is used for query or load
+        BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable()
+                .addTags(tags).build();
         for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) {
-            // broker scan node is used for query or load
-            if (be.isQueryAvailable() && be.isLoadAvailable()) {
+            if (policy.isMatch(be)) {
                 backends.add(be);
             }
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 9a820ba1a1..8d46d74baa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
 import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -60,6 +61,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Verifications;
 
 import mockit.Expectations;
 import mockit.Injectable;
@@ -131,25 +138,25 @@ public class KafkaRoutineLoadJobTest {
         // 2 partitions, 1 be
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1);
         Assert.assertEquals(2, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 3 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2);
         Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 4 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3);
         Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());
 
         // 7 partitions, 4 be
         routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
-                1L, "127.0.0.1:9020", "topic1");
+                1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4);
         Assert.assertEquals(6, routineLoadJob.calculateCurrentConcurrentTaskNum());
     }
@@ -164,7 +171,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
 
         new Expectations(catalog) {
             {
@@ -209,7 +216,7 @@ public class KafkaRoutineLoadJobTest {
 
         RoutineLoadJob routineLoadJob =
                 new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
-                        1L, "127.0.0.1:9020", "topic1");
+                        1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
         long maxBatchIntervalS = 10;
         Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS);
         new Expectations() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 8846515334..e791d11749 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.load.routineload;
 
 import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
@@ -41,6 +42,12 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java_cup.runtime.Symbol;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 import java_cup.runtime.Symbol;
 import mockit.Expectations;
@@ -318,7 +325,7 @@ public class RoutineLoadJobTest {
     @Test
     public void testGetShowCreateInfo() throws UserException {
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(111L, "test_load", "test", 1,
-                11, "localhost:9092", "test_topic");
+                11, "localhost:9092", "test_topic", UserIdentity.ADMIN);
         Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
         Deencapsulation.setField(routineLoadJob, "maxBatchRows", 10);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index f9d538fb1b..8cc5b80efb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.ResumeRoutineLoadStmt;
 import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
@@ -43,6 +44,7 @@ import org.apache.doris.persist.EditLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TResourceInfo;
 
@@ -58,6 +60,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 import mockit.Expectations;
 import mockit.Injectable;
@@ -100,7 +107,7 @@ public class RoutineLoadManagerTest {
         createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         new MockUp<KafkaRoutineLoadJob>() {
             @Mock
@@ -199,7 +206,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -207,7 +214,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
         dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
@@ -229,7 +236,7 @@ public class RoutineLoadManagerTest {
         String topicName = "topic1";
         String serverAddress = "http://127.0.0.1:8080";
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
-                serverAddress, topicName);
+                serverAddress, topicName, UserIdentity.ADMIN);
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
 
@@ -245,7 +252,7 @@ public class RoutineLoadManagerTest {
         Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
         List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
         KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
-                1L, 1L, serverAddress, topicName);
+                1L, 1L, serverAddress, topicName, UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
         routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
         nameToRoutineLoadJob.put(jobName, routineLoadJobList);
@@ -739,23 +746,26 @@ public class RoutineLoadManagerTest {
 
     @Test
     public void testCheckBeToTask(@Mocked Catalog catalog,
-                                  @Mocked SystemInfoService systemInfoService) throws LoadException {
+                                  @Mocked SystemInfoService systemInfoService) throws LoadException, DdlException {
         List<Long> beIdsInCluster = Lists.newArrayList();
         beIdsInCluster.add(1L);
         Map<Long, Integer> beIdToMaxConcurrentTasks = Maps.newHashMap();
         beIdToMaxConcurrentTasks.put(1L, 10);
         new Expectations() {
             {
-                systemInfoService.getClusterBackendIds("default", true);
+                systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
                 minTimes = 0;
                 result = beIdsInCluster;
             }
         };
 
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+        KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER,
+                10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN);
+        routineLoadManager.addRoutineLoadJob(job, "testdb");
         Config.max_routine_load_task_num_per_be = 10;
         Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
-        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, "default"));
+        Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default"));
     }
 
     @Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
index f072804295..16c21fb611 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.routineload;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
@@ -76,7 +77,7 @@ public class RoutineLoadSchedulerTest {
         Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L,
-                "xxx", "test");
+                "xxx", "test", UserIdentity.ADMIN);
         Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
         List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
         routineLoadJobList.add(kafkaRoutineLoadJob);
@@ -139,7 +140,7 @@ public class RoutineLoadSchedulerTest {
         };
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
-                "10.74.167.16:8092", "test");
+                "10.74.167.16:8092", "test", UserIdentity.ADMIN);
         RoutineLoadManager routineLoadManager = new RoutineLoadManager();
         routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
 
@@ -170,7 +171,7 @@ public class RoutineLoadSchedulerTest {
         executorService.submit(routineLoadTaskScheduler);
 
         KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition",
-                "default_cluster", 1L, 1L, "xxx", "test_1");
+                "default_cluster", 1L, 1L, "xxx", "test_1", UserIdentity.ADMIN);
         List<Integer> customKafkaPartitions = new ArrayList<>();
         customKafkaPartitions.add(2);
         Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index abdb01f940..defc2fe1c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.FakeCatalog;
@@ -66,6 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import mockit.Injectable;
+import mockit.Mocked;
+
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -319,7 +323,7 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo3);
 
         KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port",
-                "topic");
+                "topic", UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);
@@ -388,7 +392,9 @@ public class GlobalTransactionMgrTest {
         transTablets.add(tabletCommitInfo2);
         transTablets.add(tabletCommitInfo3);
 
-        KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
+        KafkaRoutineLoadJob routineLoadJob =
+                new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic",
+                        UserIdentity.ADMIN);
         List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
         Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
         partitionIdToOffset.put(1, 0L);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/08: [fix](function) If function adds type inference (#9728)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 37410280c8be2dc50ec155d3270456f48de2acc9
Author: Stalary <st...@163.com>
AuthorDate: Thu May 26 22:43:18 2022 +0800

    [fix](function) If function adds type inference (#9728)
---
 .../src/main/java/org/apache/doris/analysis/FunctionCallExpr.java | 8 +++++++-
 regression-test/data/correctness/test_select_constant.out         | 3 +++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index bc30b9d5f9..fc733bd636 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -796,7 +796,13 @@ public class FunctionCallExpr extends Expr {
                 }
                 childTypes[i] = children.get(i).type;
             }
-
+            fn = getBuiltinFunction(analyzer, fnName.getFunction(), childTypes,
+                    Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
+        } else if (fnName.getFunction().equalsIgnoreCase("if")) {
+            Type[] childTypes = collectChildReturnTypes();
+            Type assignmentCompatibleType = ScalarType.getAssignmentCompatibleType(childTypes[1], childTypes[2], true);
+            childTypes[1] = assignmentCompatibleType;
+            childTypes[2] = assignmentCompatibleType;
             fn = getBuiltinFunction(analyzer, fnName.getFunction(), childTypes,
                     Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
         } else {
diff --git a/regression-test/data/correctness/test_select_constant.out b/regression-test/data/correctness/test_select_constant.out
index d3ed51bcc7..c814de350a 100644
--- a/regression-test/data/correctness/test_select_constant.out
+++ b/regression-test/data/correctness/test_select_constant.out
@@ -2,3 +2,6 @@
 -- !select1 --
 100	test	2021-01-02
 
+-- !select --
+2010-01-02T04:03:06
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org